!str #16993 Separate scaladsl/javadsl for FlattenStategy
`abstract class FlattenStrategy` was used in both javadsl and scaladsl, but the concrete concat for the javadsl was in javadsl.FlattenStrategy and the concrete concat for the scaladsl is in stream.FlattenStrategy. Now there are separate FlattenStategy in scaladsl and javadsl packages and conversion as we have in other places. * replace JavaConverters with explicit methods * remove asJava/asScala for FlattenStrategy
This commit is contained in:
parent
4a16067cd8
commit
666bfade1e
22 changed files with 45 additions and 82 deletions
|
|
@ -8,8 +8,7 @@ import language.implicitConversions
|
|||
import language.higherKinds
|
||||
import java.nio.charset.Charset
|
||||
import com.typesafe.config.Config
|
||||
import akka.stream.FlattenStrategy
|
||||
import akka.stream.scaladsl.{ Flow, Source }
|
||||
import akka.stream.scaladsl.{ FlattenStrategy, Flow, Source }
|
||||
import akka.stream.stage._
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.{ Await, Future }
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
|
|||
import org.scalatest.matchers.Matcher
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.scaladsl.OperationAttributes._
|
||||
import akka.stream.FlattenStrategy
|
||||
import akka.stream.scaladsl.FlattenStrategy
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.util.ByteString
|
||||
import akka.actor.ActorSystem
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
|
|||
import org.scalatest.matchers.Matcher
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.scaladsl.OperationAttributes._
|
||||
import akka.stream.FlattenStrategy
|
||||
import akka.stream.scaladsl.FlattenStrategy
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.util.ByteString
|
||||
import akka.actor.ActorSystem
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import akka.event.{ NoLogging, LoggingAdapter }
|
|||
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import akka.parboiled2.util.Base64
|
||||
import akka.stream.FlattenStrategy
|
||||
import akka.stream.scaladsl.FlattenStrategy
|
||||
import akka.stream.scaladsl._
|
||||
import akka.http.engine.rendering.BodyPartRenderer
|
||||
import akka.http.util.FastFuture
|
||||
|
|
|
|||
|
|
@ -3,10 +3,10 @@
|
|||
*/
|
||||
package akka.stream.tck
|
||||
|
||||
import akka.stream.scaladsl.FlattenStrategy
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
import org.reactivestreams.Publisher
|
||||
import akka.stream.FlattenStrategy
|
||||
|
||||
class FlattenTest extends AkkaPublisherVerification[Int] {
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package akka.stream.scaladsl
|
|||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.stream.FlattenStrategy
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.ActorFlowMaterializerSettings
|
||||
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package akka.stream.javadsl
|
|||
|
||||
import akka.stream.scaladsl
|
||||
import akka.stream.{ Inlet, Outlet, Shape, Graph, BidiShape }
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
import akka.japi.Pair
|
||||
|
||||
trait BidiFlowCreate {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package akka.stream.javadsl
|
|||
|
||||
import akka.stream.scaladsl
|
||||
import akka.stream.{ Inlet, Outlet, Shape, Graph }
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
import akka.japi.Pair
|
||||
|
||||
trait FlowCreate {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package akka.stream.javadsl
|
|||
|
||||
import akka.stream.scaladsl
|
||||
import akka.stream.{ Inlet, Shape, Graph }
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
|
||||
trait GraphCreate {
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package akka.stream.javadsl
|
|||
|
||||
import akka.stream.scaladsl
|
||||
import akka.stream.{ Inlet, Shape, Graph }
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
|
||||
trait SinkCreate {
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package akka.stream.javadsl
|
|||
|
||||
import akka.stream.scaladsl
|
||||
import akka.stream.{ Outlet, Shape, Graph }
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
|
||||
trait SourceCreate {
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,12 @@
|
|||
*/
|
||||
package akka.stream.javadsl
|
||||
|
||||
import akka.stream.javadsl
|
||||
import akka.stream.scaladsl
|
||||
|
||||
/**
|
||||
* Strategy that defines how a stream of streams should be flattened into a stream of simple elements.
|
||||
*/
|
||||
abstract class FlattenStrategy[-S, T] extends scaladsl.FlattenStrategy[S, T]
|
||||
|
||||
object FlattenStrategy {
|
||||
|
||||
|
|
@ -12,8 +17,11 @@ object FlattenStrategy {
|
|||
* emitting its elements directly to the output until it completes and then taking the next stream. This has the
|
||||
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
|
||||
*/
|
||||
def concat[T]: akka.stream.FlattenStrategy[javadsl.Source[T, Unit], T] =
|
||||
akka.stream.FlattenStrategy.Concat[T]().asInstanceOf[akka.stream.FlattenStrategy[javadsl.Source[T, _], T]]
|
||||
// TODO so in theory this should be safe, but let's rethink the design later
|
||||
def concat[T]: FlattenStrategy[Source[T, Unit], T] = Concat[T]()
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, _], T]
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,8 +14,6 @@ import akka.stream.impl.StreamLayout
|
|||
|
||||
object Flow {
|
||||
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
|
||||
val factory: FlowCreate = new FlowCreate {}
|
||||
|
||||
/** Adapt [[scaladsl.Flow]] for use within Java DSL */
|
||||
|
|
@ -44,12 +42,11 @@ object Flow {
|
|||
/** Create a `Flow` which can process elements of type `T`. */
|
||||
class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph[FlowShape[In, Out], Mat] {
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
|
||||
override def shape: FlowShape[In, Out] = delegate.shape
|
||||
private[stream] def module: StreamLayout.Module = delegate.module
|
||||
|
||||
/** Converts this Flow to it's Scala DSL counterpart */
|
||||
/** Converts this Flow to its Scala DSL counterpart */
|
||||
def asScala: scaladsl.Flow[In, Out, Mat] = delegate
|
||||
|
||||
/**
|
||||
|
|
@ -401,7 +398,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
|
||||
* This operation can be used on a stream of element type [[Source]].
|
||||
*/
|
||||
def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] =
|
||||
def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] =
|
||||
new Flow(delegate.flatten(strategy))
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -279,7 +279,7 @@ object FlowGraph {
|
|||
*/
|
||||
def builder[M](): Builder[M] = new Builder()(new scaladsl.FlowGraph.Builder[M])
|
||||
|
||||
final class Builder[Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self ⇒
|
||||
final class Builder[+Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self ⇒
|
||||
import akka.stream.scaladsl.FlowGraph.Implicits._
|
||||
|
||||
def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to)
|
||||
|
|
|
|||
|
|
@ -17,8 +17,6 @@ import scala.util.Try
|
|||
/** Java API */
|
||||
object Sink {
|
||||
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
|
||||
val factory: SinkCreate = new SinkCreate {}
|
||||
|
||||
/** Adapt [[scaladsl.Sink]] for use within Java DSL */
|
||||
|
|
@ -116,7 +114,7 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[
|
|||
override def shape: SinkShape[In] = delegate.shape
|
||||
private[stream] def module: StreamLayout.Module = delegate.module
|
||||
|
||||
/** Converts this Sink to it's Scala DSL counterpart */
|
||||
/** Converts this Sink to its Scala DSL counterpart */
|
||||
def asScala: scaladsl.Sink[In, Mat] = delegate
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -23,8 +23,6 @@ import scala.annotation.varargs
|
|||
/** Java API */
|
||||
object Source {
|
||||
|
||||
import scaladsl.JavaConverters._
|
||||
|
||||
val factory: SourceCreate = new SourceCreate {}
|
||||
|
||||
/** Adapt [[scaladsl.Source]] for use within JavaDSL */
|
||||
|
|
@ -181,14 +179,12 @@ object Source {
|
|||
* Can be used as a `Publisher`
|
||||
*/
|
||||
class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[SourceShape[Out], Mat] {
|
||||
import akka.stream.scaladsl.JavaConverters._
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
override def shape: SourceShape[Out] = delegate.shape
|
||||
private[stream] def module: StreamLayout.Module = delegate.module
|
||||
|
||||
/** Converts this Java DSL element to it's Scala DSL counterpart. */
|
||||
/** Converts this Java DSL element to its Scala DSL counterpart. */
|
||||
def asScala: scaladsl.Source[Out, Mat] = delegate
|
||||
|
||||
/**
|
||||
|
|
@ -472,7 +468,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
|
||||
* This operation can be used on a stream of element type [[Source]].
|
||||
*/
|
||||
def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Source[U, Mat] =
|
||||
def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] =
|
||||
new Source(delegate.flatten(strategy))
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1,12 +1,14 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.javadsl
|
||||
|
||||
/**
|
||||
* Strategy that defines how a stream of streams should be flattened into a stream of simple elements.
|
||||
*/
|
||||
abstract class FlattenStrategy[-T, U]
|
||||
abstract class FlattenStrategy[-S, T]
|
||||
|
||||
object FlattenStrategy {
|
||||
|
||||
|
|
@ -15,7 +17,7 @@ object FlattenStrategy {
|
|||
* emitting its elements directly to the output until it completes and then taking the next stream. This has the
|
||||
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
|
||||
*/
|
||||
def concat[T]: FlattenStrategy[scaladsl.Source[T, _], T] = Concat[T]()
|
||||
def concat[T]: FlattenStrategy[Source[T, _], T] = Concat[T]()
|
||||
|
||||
private[akka] final case class Concat[T]() extends FlattenStrategy[scaladsl.Source[T, _], T]
|
||||
private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, _], T]
|
||||
}
|
||||
|
|
@ -286,6 +286,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
|||
this.section[O, O2, Mat2, Mat2](attributes, Keep.right)(section)
|
||||
}
|
||||
|
||||
/** Converts this Scala DSL element to it's Java DSL counterpart. */
|
||||
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)
|
||||
|
||||
}
|
||||
|
||||
object Flow extends FlowApply {
|
||||
|
|
@ -627,8 +630,8 @@ trait FlowOps[+Out, +Mat] {
|
|||
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
|
||||
* This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]].
|
||||
*/
|
||||
def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match {
|
||||
case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll())
|
||||
def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match {
|
||||
case _: FlattenStrategy.Concat[Out] | _: javadsl.FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll())
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -297,6 +297,9 @@ object FlowGraph extends GraphApply {
|
|||
|
||||
private[stream] def module: Module = moduleInProgress
|
||||
|
||||
/** Converts this Scala DSL element to it's Java DSL counterpart. */
|
||||
def asJava: javadsl.FlowGraph.Builder[M] = new javadsl.FlowGraph.Builder()(this)
|
||||
|
||||
}
|
||||
|
||||
object Implicits {
|
||||
|
|
|
|||
|
|
@ -1,45 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.javadsl
|
||||
import akka.stream.scaladsl
|
||||
|
||||
/**
|
||||
* Implicit converters allowing to convert between Java and Scala DSL elements.
|
||||
*/
|
||||
private[akka] object JavaConverters {
|
||||
|
||||
implicit final class AddAsJavaSource[Out, Mat](val source: scaladsl.Source[Out, Mat]) extends AnyVal {
|
||||
def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(source)
|
||||
}
|
||||
implicit final class AddAsJavaFlow[In, Out, Mat](val flow: scaladsl.Flow[In, Out, Mat]) extends AnyVal {
|
||||
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(flow)
|
||||
}
|
||||
implicit final class AddAsJavaBidiFlow[I1, O1, I2, O2, Mat](val flow: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends AnyVal {
|
||||
def asJava: javadsl.BidiFlow[I1, O1, I2, O2, Mat] = new javadsl.BidiFlow(flow)
|
||||
}
|
||||
implicit final class AddAsJavaSink[In, Mat](val sink: scaladsl.Sink[In, Mat]) extends AnyVal {
|
||||
def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink)
|
||||
}
|
||||
implicit final class AsAsJavaFlowGraphBuilder[Out, Mat](val builder: scaladsl.FlowGraph.Builder[Mat]) extends AnyVal {
|
||||
def asJava: javadsl.FlowGraph.Builder[Mat] = new javadsl.FlowGraph.Builder()(builder)
|
||||
}
|
||||
|
||||
implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal {
|
||||
def asScala: scaladsl.Source[Out, Mat] = source.asScala
|
||||
}
|
||||
implicit final class AddAsScalaFlow[In, Out, Mat](val flow: javadsl.Flow[In, Out, Mat]) extends AnyVal {
|
||||
def asScala: scaladsl.Flow[In, Out, Mat] = flow.asScala
|
||||
}
|
||||
implicit final class AddAsScalaBidiFlow[I1, O1, I2, O2, Mat](val flow: javadsl.BidiFlow[I1, O1, I2, O2, Mat]) extends AnyVal {
|
||||
def asScala: scaladsl.BidiFlow[I1, O1, I2, O2, Mat] = flow.asScala
|
||||
}
|
||||
implicit final class AddAsScalaSink[In, Mat](val sink: javadsl.Sink[In, Mat]) extends AnyVal {
|
||||
def asScala: scaladsl.Sink[In, Mat] = sink.asScala
|
||||
}
|
||||
implicit final class AsAsScalaFlowGraphBuilder[Out, Mat](val builder: javadsl.FlowGraph.Builder[Mat]) extends AnyVal {
|
||||
def asScala: FlowGraph.Builder[Mat] = builder.asScala
|
||||
}
|
||||
}
|
||||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.javadsl
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.stream.impl._
|
||||
import akka.stream.{ SinkShape, Inlet, Outlet, Graph }
|
||||
|
|
@ -40,6 +41,9 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
|
|||
new Sink(module.withAttributes(attr).wrap())
|
||||
|
||||
def named(name: String): Sink[In, Mat] = withAttributes(OperationAttributes.name(name))
|
||||
|
||||
/** Converts this Scala DSL element to it's Java DSL counterpart. */
|
||||
def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this)
|
||||
}
|
||||
|
||||
object Sink extends SinkApply {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.javadsl
|
||||
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
|
||||
import akka.stream.{ SourceShape, Inlet, Outlet }
|
||||
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
|
||||
|
|
@ -152,6 +153,9 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
|
|||
override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] =
|
||||
new Source(module.withAttributes(attr).wrap())
|
||||
|
||||
/** Converts this Scala DSL element to it's Java DSL counterpart. */
|
||||
def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this)
|
||||
|
||||
}
|
||||
|
||||
object Source extends SourceApply {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue