!str #16951 Unify scaladsl and javadsl OperationAttributes
* Move actor specific attributes to ActorOperationAttributes
This commit is contained in:
parent
4388782af4
commit
00033313e0
55 changed files with 183 additions and 137 deletions
|
|
@ -9,6 +9,7 @@ import akka.stream.testkit.AkkaSpec
|
|||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.stream.OperationAttributes
|
||||
|
||||
object FlexiDocSpec {
|
||||
//#fleximerge-zip-states
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import akka.stream.ActorFlowMaterializerSettings
|
|||
import akka.stream.Supervision
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import akka.stream.OperationAttributes
|
||||
import akka.stream.ActorOperationAttributes
|
||||
|
||||
class FlowErrorDocSpec extends AkkaSpec {
|
||||
|
||||
|
|
@ -52,7 +54,7 @@ class FlowErrorDocSpec extends AkkaSpec {
|
|||
}
|
||||
val flow = Flow[Int]
|
||||
.filter(100 / _ < 50).map(elem => 100 / (5 - elem))
|
||||
.withAttributes(OperationAttributes.supervisionStrategy(decider))
|
||||
.withAttributes(ActorOperationAttributes.supervisionStrategy(decider))
|
||||
val source = Source(0 to 5).via(flow)
|
||||
|
||||
val result = source.runWith(Sink.fold(0)(_ + _))
|
||||
|
|
@ -75,7 +77,7 @@ class FlowErrorDocSpec extends AkkaSpec {
|
|||
if (elem < 0) throw new IllegalArgumentException("negative not allowed")
|
||||
else acc + elem
|
||||
}
|
||||
.withAttributes(OperationAttributes.supervisionStrategy(decider))
|
||||
.withAttributes(ActorOperationAttributes.supervisionStrategy(decider))
|
||||
val source = Source(List(1, 3, -1, 5, 7)).via(flow)
|
||||
val result = source.grouped(1000).runWith(Sink.head)
|
||||
// the negative element cause the scan stage to be restarted,
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.stream.testkit.AkkaSpec
|
|||
import scala.collection.immutable
|
||||
import scala.concurrent.{ Future, Await }
|
||||
import scala.concurrent.duration._
|
||||
import akka.stream.OperationAttributes
|
||||
|
||||
class FlowGraphDocSpec extends AkkaSpec {
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@ import akka.actor.Actor
|
|||
import akka.actor.Props
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
import akka.stream.OperationAttributes
|
||||
import akka.stream.ActorOperationAttributes
|
||||
import scala.concurrent.ExecutionContext
|
||||
import akka.stream.ActorFlowMaterializerSettings
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
|
@ -170,7 +172,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
tweets.filter(_.hashtags.contains(akka)).map(_.author)
|
||||
|
||||
//#email-addresses-mapAsync-supervision
|
||||
import OperationAttributes.supervisionStrategy
|
||||
import ActorOperationAttributes.supervisionStrategy
|
||||
import Supervision.resumingDecider
|
||||
|
||||
val emailAddresses: Source[String, Unit] =
|
||||
|
|
@ -268,7 +270,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
.map { phoneNo =>
|
||||
smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet"))
|
||||
}
|
||||
.withAttributes(OperationAttributes.dispatcher("blocking-dispatcher"))
|
||||
.withAttributes(ActorOperationAttributes.dispatcher("blocking-dispatcher"))
|
||||
val sendTextMessages: RunnableFlow[Unit] =
|
||||
phoneNumbers.via(send).to(Sink.ignore)
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package docs.stream
|
|||
import akka.stream.{ OverflowStrategy, ActorFlowMaterializerSettings, ActorFlowMaterializer }
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import akka.stream.OperationAttributes
|
||||
|
||||
class StreamBuffersRateSpec extends AkkaSpec {
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue