=str #16549 doc: Integration with External Services

This commit is contained in:
Patrik Nordwall 2014-12-18 10:34:59 +01:00
parent ef2835d60e
commit fdbd19e302
7 changed files with 636 additions and 56 deletions

View file

@ -0,0 +1,360 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import scala.concurrent.duration._
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl.Source
import java.util.Date
import akka.stream.FlowMaterializer
import scala.concurrent.Future
import akka.stream.scaladsl.RunnableFlow
import akka.stream.scaladsl.Sink
import akka.testkit.TestProbe
import akka.actor.ActorRef
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import akka.stream.scaladsl.OperationAttributes
import scala.concurrent.ExecutionContext
import akka.stream.MaterializerSettings
import java.util.concurrent.atomic.AtomicInteger
object IntegrationDocSpec {
import TwitterStreamQuickstartDocSpec._
val config = ConfigFactory.parseString("""
#//#blocking-dispatcher-config
blocking-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 10
core-pool-size-max = 10
}
}
#//#blocking-dispatcher-config
akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox
""")
class AddressSystem {
//#email-address-lookup
def lookupEmail(handle: String): Future[Option[String]] =
//#email-address-lookup
Future.successful(Some(handle + "@somewhere.com"))
//#phone-lookup
def lookupPhoneNumber(handle: String): Future[Option[String]] =
//#phone-lookup
Future.successful(Some(handle.hashCode.toString))
}
final case class Email(to: String, title: String, body: String)
final case class TextMessage(to: String, body: String)
class EmailServer(probe: ActorRef) {
//#email-server-send
def send(email: Email): Future[Unit] = {
// ...
//#email-server-send
probe ! email.to
Future.successful(())
//#email-server-send
}
//#email-server-send
}
class SmsServer(probe: ActorRef) {
//#sms-server-send
def send(text: TextMessage): Unit = {
// ...
//#sms-server-send
probe ! text.to
//#sms-server-send
}
//#sms-server-send
}
final case class Save(tweet: Tweet)
final case object SaveDone
class DatabaseService(probe: ActorRef) extends Actor {
override def receive = {
case Save(tweet: Tweet) =>
probe ! tweet.author.handle
sender() ! SaveDone
}
}
//#sometimes-slow-service
class SometimesSlowService(implicit ec: ExecutionContext) {
//#sometimes-slow-service
def println(s: String): Unit = ()
//#sometimes-slow-service
private val runningCount = new AtomicInteger
def convert(s: String): Future[String] = {
println(s"running: $s (${runningCount.incrementAndGet()})")
Future {
if (s.nonEmpty && s.head.isLower)
Thread.sleep(500)
else
Thread.sleep(20)
println(s"completed: $s (${runningCount.decrementAndGet()})")
s.toUpperCase
}
}
}
//#sometimes-slow-service
}
class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
import TwitterStreamQuickstartDocSpec._
import IntegrationDocSpec._
implicit val mat = FlowMaterializer()
"calling external service with mapAsync" in {
val probe = TestProbe()
val addressSystem = new AddressSystem
val emailServer = new EmailServer(probe.ref)
//#tweet-authors
val authors: Source[Author] =
tweets
.filter(_.hashtags.contains(Akka))
.map(_.author)
//#tweet-authors
//#email-addresses-mapAsync
val emailAddresses: Source[String] =
authors
.mapAsync(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
//#email-addresses-mapAsync
//#send-emails
val sendEmails: RunnableFlow =
emailAddresses
.mapAsync { address =>
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
}
.to(Sink.ignore)
sendEmails.run()
//#send-emails
probe.expectMsg("rolandkuhn@somewhere.com")
probe.expectMsg("patriknw@somewhere.com")
probe.expectMsg("bantonsson@somewhere.com")
probe.expectMsg("drewhk@somewhere.com")
probe.expectMsg("ktosopl@somewhere.com")
probe.expectMsg("mmartynas@somewhere.com")
probe.expectMsg("akkateam@somewhere.com")
}
"calling external service with mapAsyncUnordered" in {
val probe = TestProbe()
val addressSystem = new AddressSystem
val emailServer = new EmailServer(probe.ref)
//#external-service-mapAsyncUnordered
val authors: Source[Author] =
tweets.filter(_.hashtags.contains(Akka)).map(_.author)
val emailAddresses: Source[String] =
authors
.mapAsyncUnordered(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
val sendEmails: RunnableFlow =
emailAddresses
.mapAsyncUnordered { address =>
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
}
.to(Sink.ignore)
sendEmails.run()
//#external-service-mapAsyncUnordered
probe.receiveN(7).toSet should be(Set(
"rolandkuhn@somewhere.com",
"patriknw@somewhere.com",
"bantonsson@somewhere.com",
"drewhk@somewhere.com",
"ktosopl@somewhere.com",
"mmartynas@somewhere.com",
"akkateam@somewhere.com"))
}
"careful managed blocking with mapAsync" in {
val probe = TestProbe()
val addressSystem = new AddressSystem
val smsServer = new SmsServer(probe.ref)
val authors = tweets.filter(_.hashtags.contains(Akka)).map(_.author)
val phoneNumbers =
authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
//#blocking-mapAsync
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val sendTextMessages: RunnableFlow =
phoneNumbers
.mapAsync { phoneNo =>
Future {
smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet"))
}(blockingExecutionContext)
}
.to(Sink.ignore)
sendTextMessages.run()
//#blocking-mapAsync
probe.receiveN(7).toSet should be(Set(
"rolandkuhn".hashCode.toString,
"patriknw".hashCode.toString,
"bantonsson".hashCode.toString,
"drewhk".hashCode.toString,
"ktosopl".hashCode.toString,
"mmartynas".hashCode.toString,
"akkateam".hashCode.toString))
}
"careful managed blocking with map" in {
val probe = TestProbe()
val addressSystem = new AddressSystem
val smsServer = new SmsServer(probe.ref)
val authors = tweets.filter(_.hashtags.contains(Akka)).map(_.author)
val phoneNumbers =
authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
//#blocking-map
val sendTextMessages: RunnableFlow =
phoneNumbers
.section(OperationAttributes.dispatcher("blocking-dispatcher")) {
_.map { phoneNo =>
smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet"))
}
}
.to(Sink.ignore)
sendTextMessages.run()
//#blocking-map
probe.expectMsg("rolandkuhn".hashCode.toString)
probe.expectMsg("patriknw".hashCode.toString)
probe.expectMsg("bantonsson".hashCode.toString)
probe.expectMsg("drewhk".hashCode.toString)
probe.expectMsg("ktosopl".hashCode.toString)
probe.expectMsg("mmartynas".hashCode.toString)
probe.expectMsg("akkateam".hashCode.toString)
}
"calling actor service with mapAsync" in {
val probe = TestProbe()
val database = system.actorOf(Props(classOf[DatabaseService], probe.ref), "db")
//#save-tweets
import akka.pattern.ask
val akkaTweets: Source[Tweet] = tweets.filter(_.hashtags.contains(Akka))
implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableFlow =
akkaTweets
.mapAsync(tweet => database ? Save(tweet))
.to(Sink.ignore)
//#save-tweets
saveTweets.run()
probe.expectMsg("rolandkuhn")
probe.expectMsg("patriknw")
probe.expectMsg("bantonsson")
probe.expectMsg("drewhk")
probe.expectMsg("ktosopl")
probe.expectMsg("mmartynas")
probe.expectMsg("akkateam")
}
"illustrate ordering and parallelism of mapAsync" in {
val probe = TestProbe()
def println(s: String): Unit = {
if (s.startsWith("after:"))
probe.ref ! s
}
//#sometimes-slow-mapAsync
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val service = new SometimesSlowService
implicit val mat = FlowMaterializer(
MaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsync(service.convert)
.foreach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsync
probe.expectMsg("after: A")
probe.expectMsg("after: B")
probe.expectMsg("after: C")
probe.expectMsg("after: D")
probe.expectMsg("after: E")
probe.expectMsg("after: F")
probe.expectMsg("after: G")
probe.expectMsg("after: H")
probe.expectMsg("after: I")
probe.expectMsg("after: J")
}
"illustrate ordering and parallelism of mapAsyncUnordered" in {
val probe = TestProbe()
def println(s: String): Unit = {
if (s.startsWith("after:"))
probe.ref ! s
}
//#sometimes-slow-mapAsyncUnordered
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val service = new SometimesSlowService
implicit val mat = FlowMaterializer(
MaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsyncUnordered(service.convert)
.foreach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsyncUnordered
probe.receiveN(10).toSet should be(Set(
"after: A",
"after: B",
"after: C",
"after: D",
"after: E",
"after: F",
"after: G",
"after: H",
"after: I",
"after: J"))
}
}

View file

@ -26,23 +26,40 @@ import concurrent.Future
import akka.stream.testkit.AkkaSpec
// TODO replace with => and disable this intellij setting
class TwitterStreamQuickstartDocSpec extends AkkaSpec {
implicit val executionContext = system.dispatcher
object TwitterStreamQuickstartDocSpec {
//#model
final case class Author(handle: String)
val AkkaTeam = Author("akkateam")
val Akka = Hashtag("#akka")
final case class Hashtag(name: String)
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: List[Hashtag] =
body.split(" ").toList.collect { case t if t.startsWith("#") Hashtag(t) }
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") Hashtag(t) }.toSet
}
//#model
val tweets = Source(
Tweet(Author("rolandkuhn"), (new Date).getTime, "#akka rocks!") ::
Tweet(Author("patriknw"), (new Date).getTime, "#akka !") ::
Tweet(Author("bantonsson"), (new Date).getTime, "#akka !") ::
Tweet(Author("drewhk"), (new Date).getTime, "#akka !") ::
Tweet(Author("ktosopl"), (new Date).getTime, "#akka on the rocks!") ::
Tweet(Author("mmartynas"), (new Date).getTime, "wow #akka !") ::
Tweet(Author("akkateam"), (new Date).getTime, "#akka rocks!") ::
Tweet(Author("bananaman"), (new Date).getTime, "#bananas rock!") ::
Tweet(Author("appleman"), (new Date).getTime, "#apples rock!") ::
Tweet(Author("drama"), (new Date).getTime, "we compared #apples to #oranges!") ::
Nil)
}
// TODO replace with => and disable this intellij setting
class TwitterStreamQuickstartDocSpec extends AkkaSpec {
import TwitterStreamQuickstartDocSpec._
implicit val executionContext = system.dispatcher
trait Example0 {
//#tweet-source
val tweets: Source[Tweet]
@ -56,33 +73,20 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#materializer-setup
}
val tweets = Source(
Tweet(Author("rolandkuhn"), (new Date).getTime, "#akka rocks!") ::
Tweet(Author("patriknw"), (new Date).getTime, "#akka!") ::
Tweet(Author("bantonsson"), (new Date).getTime, "#akka!") ::
Tweet(Author("drewhk"), (new Date).getTime, "#akka!") ::
Tweet(Author("ktosopl"), (new Date).getTime, "#akka on the rocks!") ::
Tweet(Author("mmartynas"), (new Date).getTime, "wow #akka!") ::
Tweet(Author("akkateam"), (new Date).getTime, "#akka rocks!") ::
Tweet(Author("bananaman"), (new Date).getTime, "#bananas rock!") ::
Tweet(Author("appleman"), (new Date).getTime, "#apples rock!") ::
Tweet(Author("drama"), (new Date).getTime, "we compared #apples to #oranges!") ::
Nil)
implicit val mat = FlowMaterializer()
"filter and map" in {
//#authors-filter-map
val authors: Source[Author] =
tweets
.filter(_.hashtags.contains("#akka"))
.filter(_.hashtags.contains(Akka))
.map(_.author)
//#authors-filter-map
trait Example3 {
//#authors-collect
val authors: Source[Author] =
tweets.collect { case t if t.hashtags.contains("#akka") t.author }
tweets.collect { case t if t.hashtags.contains(Akka) t.author }
//#authors-collect
}
@ -97,7 +101,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
"mapConcat hashtags" in {
//#hashtags-mapConcat
val hashtags: Source[Hashtag] = tweets.mapConcat(_.hashtags)
val hashtags: Source[Hashtag] = tweets.mapConcat(_.hashtags.toList)
//#hashtags-mapConcat
}
@ -119,7 +123,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val b = Broadcast[Tweet]
tweets ~> b ~> Flow[Tweet].map(_.author) ~> writeAuthors
b ~> Flow[Tweet].mapConcat(_.hashtags) ~> writeHashtags
b ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
}
g.run()
//#flow-graph-broadcast
@ -181,13 +185,13 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableFlow: RunnableFlow =
tweetsInMinuteFromNow
.filter(_.hashtags contains "#akka")
.filter(_.hashtags contains Akka)
.map(t 1)
.to(sumSink)
// materialize the stream once in the morning
val morningMaterialized = counterRunnableFlow.run()
// and once in the evening, reusing the
// and once in the evening, reusing the
val eveningMaterialized = counterRunnableFlow.run()
// the sumSink materialized two different futures