=str Add IterableSource.

Signed-off-by: He-Pin <hepin1989@gmail.com>
This commit is contained in:
He-Pin 2023-08-22 12:54:36 +08:00 committed by kerr
parent 95bcbdd139
commit 5ad70fffeb
4 changed files with 130 additions and 7 deletions

View file

@ -227,7 +227,7 @@ class ReverseArrowSpec extends StreamSpec {
src ~> f
sink2 <~ f
(the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include(
"[StatefulMapConcat.out] is already connected")
"[IterableSource.out] is already connected")
ClosedShape
})
.run(),

View file

@ -311,19 +311,67 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
}
"use decider when iterator throws" in {
// using stopping decider
Source
.fromIterator(() => (1 to 5).iterator.map(k => if (k != 3) k else throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.grouped(10)
.runWith(Sink.head)
.failed
.futureValue shouldBe a[TE]
// using stopping decider with recover
Source
.fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.recoverWithRetries(1, { case _ => Source.empty })
.grouped(10)
.runWith(Sink.head)
.futureValue shouldBe List(1, 2)
// failing on every elements, using stopping decider
Source
.fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.grouped(10)
.runWith(Sink.headOption)
.failed
.futureValue shouldBe a[TE]
// failing on every elements, using stopping decider and recover
Source
.fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.recoverWithRetries(1, { case _ => Source.empty })
.grouped(10)
.runWith(Sink.headOption)
.futureValue shouldBe None
// using resuming decider
Source
.fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.grouped(10)
.runWith(Sink.head)
.futureValue should ===(List(1, 2, 4, 5))
// using restarting decider
Source
.fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.grouped(10)
.runWith(Sink.head)
.futureValue should ===(List(1, 2))
.futureValue should ===(List(1, 2, 1, 2, 1, 2, 1, 2, 1, 2))
// with failing on every elements, using restarting decider
Source
.fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.grouped(10)
.runWith(Sink.headOption)
.futureValue should ===(None)
.failed
.futureValue shouldBe a[TE]
}
}

View file

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.stream.impl.fusing
import scala.collection.immutable
import scala.util.control.NonFatal
import org.apache.pekko
import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.impl.ReactiveStreamsCompliance
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
private[pekko] final class IterableSource[T](val elements: immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
ReactiveStreamsCompliance.requireNonNullElement(elements)
override protected def initialAttributes: Attributes = DefaultAttributes.iterableSource
private val out = Outlet[T]("IterableSource.out")
override val shape: SourceShape[T] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var currentIterator: Iterator[T] = _
override def onPull(): Unit =
try {
if (currentIterator eq null) {
currentIterator = elements.iterator
}
tryPushNextOrComplete()
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => failStage(ex)
case Supervision.Resume => tryPushNextOrComplete()
case Supervision.Restart =>
currentIterator = elements.iterator
tryPushNextOrComplete()
}
}
private def tryPushNextOrComplete(): Unit =
if (currentIterator.hasNext) {
if (isAvailable(out)) {
push(out, currentIterator.next())
if (!currentIterator.hasNext) {
completeStage()
}
}
} else {
completeStage()
}
setHandler(out, this)
}
override def toString: String = "IterableSource"
}

View file

@ -28,7 +28,7 @@ import pekko.annotation.InternalApi
import pekko.stream.{ Outlet, SourceShape, _ }
import pekko.stream.impl.{ PublisherSource, _ }
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ GraphStages, LazyFutureSource, LazySingleSource }
import pekko.stream.impl.fusing.{ GraphStages, IterableSource, LazyFutureSource, LazySingleSource }
import pekko.stream.impl.fusing.GraphStages._
import pekko.stream.stage.GraphStageWithMaterializedValue
import pekko.util.ConstantFun
@ -356,7 +356,7 @@ object Source {
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)
fromGraph(new IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
/**
* Starts a new `Source` from the given `Future`. The stream will consist of
@ -419,8 +419,7 @@ object Source {
* Create a `Source` that will continually emit the given element.
*/
def repeat[T](element: T): Source[T, NotUsed] = {
val next = Some((element, element))
unfold(element)(_ => next).withAttributes(DefaultAttributes.repeat)
fromIterator(() => Iterator.continually(element)).withAttributes(DefaultAttributes.repeat)
}
/**