diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 9d6b9cdc5b..a6a4f91c59 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -118,7 +118,7 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out) case ConcatModule(shape, _) ⇒ - require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // FIXME + require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO (Concat.props(effectiveSettings), shape.inArray.toSeq, shape.out) case zip: ZipWithModule ⇒ @@ -270,7 +270,7 @@ private[akka] object ActorProcessorFactory { case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) case SplitWhen(p, _) ⇒ (SplitWhenProcessorImpl.props(settings, p), ()) - case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) //FIXME closes over the materializer, is this good? + case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) case StageFactory(mkStage, _) ⇒ (ActorInterpreter.props(settings, List(mkStage()), materializer), ()) case TimerTransform(mkStage, _) ⇒ (TimerTransformerProcessorsImpl.props(settings, mkStage()), ()) case MaterializingStageFactory(mkStageAndMat, _) ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index b8f5c29f2e..2de23fb189 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -249,7 +249,6 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali with ActorLogging with Pump { - // FIXME: make pump a member protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index ed37d664bd..fb3ac2c8a2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -79,11 +79,9 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu case SubscribePending ⇒ subscribePending() case RequestMore(subscription, elements) ⇒ - // FIXME can we avoid this cast? moreRequested(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]], elements) pump.pump() case Cancel(subscription) ⇒ - // FIXME can we avoid this cast? unregisterSubscription(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]]) pump.pump() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index 5fd0d05ab3..7b6141e25f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -142,7 +142,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate } } - override def postStop(): Unit = // FIXME if something blows up, are the subscribers onErrored? + override def postStop(): Unit = if (exposedPublisher ne null) exposedPublisher.shutdown(shutdownReason) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index b74022efb0..1bdbcd25de 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -173,7 +173,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective = if (ctx.isFinishing) { val elem = buf.result() - buf.clear() //FIXME null out the reference to the `buf`? + buf.clear() left = n ctx.pushAndFinish(elem) } else ctx.pull() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 713b892ab4..506dc7f42c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -223,7 +223,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * `n` must be positive, otherwise IllegalArgumentException is thrown. */ def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = - new Flow(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step + new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step /** * Similar to `fold` but is not a terminal operation, @@ -249,7 +249,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * IllegalArgumentException is thrown. */ def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = - new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step + new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** * Discard the given number of elements at the beginning of the stream. @@ -371,7 +371,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * the element is dropped and the stream and substreams continue. */ def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] = - new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step + new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step /** * This operation applies the given predicate to all incoming elements and diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1f6f1eb9dd..bdfef7f87b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3,6 +3,7 @@ */ package akka.stream.javadsl +import scala.collection.immutable import java.util.concurrent.Callable import akka.actor.{ Cancellable, ActorRef, Props } import akka.japi.Util @@ -95,9 +96,21 @@ object Source { * Iterator, but every Subscriber directly attached to the Publisher of this * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as a `Source`. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. */ - def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O, Unit] = - new Source(scaladsl.Source(akka.stream.javadsl.japi.Util.immutableIterable(iterable))) + def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O, Unit] = { + // this adapter is not immutable if the the underlying java.lang.Iterable is modified + // but there is not anything we can do to prevent that from happening. + // ConcurrentModificationException will be thrown in some cases. + val scalaIterable = new immutable.Iterable[O] { + import collection.JavaConverters._ + override def iterator: Iterator[O] = iterable.iterator().asScala + } + new Source(scaladsl.Source(scalaIterable)) + } /** * Start a new `Source` from the given `Future`. The stream will consist of @@ -366,7 +379,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * @param n must be positive, and `d` must be greater than 0 seconds, otherwise [[IllegalArgumentException]] is thrown. */ def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = - new Source(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step + new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** * Discard the given number of elements at the beginning of the stream. @@ -476,7 +489,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * to consume only one of them. */ def groupBy[K](f: japi.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] = - new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step + new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step /** * This operation applies the given predicate to all incoming elements and diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala b/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala deleted file mode 100644 index 2024476d39..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.javadsl.japi - -import scala.collection.immutable - -object Util { - - import collection.JavaConverters._ - // FIXME this does not make something an immutable iterable!! - def immutableIterable[T](iterable: java.lang.Iterable[T]): immutable.Iterable[T] = - new immutable.Iterable[T] { - override def iterator: Iterator[T] = iterable.iterator().asScala - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala b/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala index 522ddd2bb4..5b31a67ac1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala @@ -9,7 +9,7 @@ package akka.stream.javadsl.japi /** * A Function interface. Used to create first-class-functions is Java. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Function[-T, +R] { @throws(classOf[Exception]) def apply(param: T): R @@ -18,7 +18,7 @@ trait Function[-T, +R] { /** * A Function interface. Used to create 2-arg first-class-functions is Java. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Function2[-T1, -T2, +R] { @throws(classOf[Exception]) def apply(arg1: T1, arg2: T2): R @@ -27,7 +27,7 @@ trait Function2[-T1, -T2, +R] { /** * A constructor/factory, takes no parameters but creates a new value of type T every call. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Creator[+T] extends Serializable { /** * This method must return a different instance upon every call. @@ -39,7 +39,7 @@ trait Creator[+T] extends Serializable { /** * A Procedure is like a Function, but it doesn't produce a return value. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Procedure[-T] { @throws(classOf[Exception]) def apply(param: T): Unit @@ -48,7 +48,7 @@ trait Procedure[-T] { /** * Java API: Defines a criteria and determines whether the parameter meets this criteria. */ -@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! +@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! trait Predicate[-T] { def test(param: T): Boolean } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index e16ac52926..b982fba64e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -240,7 +240,6 @@ abstract class FlexiRoute[In, S <: Shape](val shape: S, attributes: OperationAtt case None ⇒ super.toString } - // FIXME what to do about this? override def withAttributes(attr: OperationAttributes): Graph[S, Unit] = throw new UnsupportedOperationException( "withAttributes not supported by default by FlexiRoute, subclass may override and implement it")