!str #15271 make transform() take a factory instead of Transformer

+ Makes reusing flows safe
+ Adds timerTransform for ops which need TimerTransformer
+ Adds TransformerLike in order to keep transform/timerTransform
  typesafe in respect to the passed in type og Transformer

Resolves #15271
This commit is contained in:
Konrad 'ktoso' Malawski 2014-08-22 11:42:05 +02:00
parent 24254c6b48
commit 5b1c05f3fe
26 changed files with 368 additions and 281 deletions

View file

@ -3,34 +3,18 @@
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef }
import akka.pattern.ask
import akka.stream._
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.immutable
import org.reactivestreams.{ Publisher, Subscriber, Processor }
import akka.actor.ActorRefFactory
import akka.stream.{ OverflowStrategy, MaterializerSettings, FlowMaterializer, Transformer }
import scala.util.Try
import scala.concurrent.Future
import scala.util.Success
import scala.util.Failure
import java.util.concurrent.atomic.AtomicLong
import akka.actor.ActorContext
import akka.actor.ExtensionIdProvider
import akka.actor.ExtensionId
import akka.actor.ExtendedActorSystem
import akka.actor.ActorSystem
import akka.actor.Extension
import scala.concurrent.duration.FiniteDuration
import akka.stream.TimerTransformer
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor.LocalActorRef
import akka.actor.RepointableActorRef
import akka.actor.ActorCell
import scala.util.{ Failure, Success }
/**
* INTERNAL API
@ -40,9 +24,8 @@ private[akka] object Ast {
def name: String
}
case class Transform(transformer: Transformer[Any, Any]) extends AstNode {
override def name = transformer.name
}
case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode
case class TimerTransform(name: String, mkTransformer: () TimerTransformer[Any, Any]) extends AstNode
case class MapFuture(f: Any Future[Any]) extends AstNode {
override def name = "mapFuture"
}
@ -137,8 +120,7 @@ private[akka] case class ActorBasedFlowMaterializer(
flowNameCounter: AtomicLong,
namePrefix: String)
extends FlowMaterializer(settings) {
import Ast._
import ActorBasedFlowMaterializer._
import akka.stream.impl.Ast._
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
@ -170,7 +152,7 @@ private[akka] case class ActorBasedFlowMaterializer(
}
}
private val identityTransform = Transform(
private val identityTransform = Transform("identity", ()
new Transformer[Any, Any] {
override def onNext(element: Any) = List(element)
})
@ -178,7 +160,6 @@ private[akka] case class ActorBasedFlowMaterializer(
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessor.props(settings, op), s"$flowName-$n-${op.name}")
ActorProcessor(impl)
}
def actorOf(props: Props, name: String): ActorRef = supervisor match {