Akka(43): Http:SSE-Server Sent 伊芙(Eve)nt – 服务端主推音讯

 
 因为自身询问Akka-http的要害目标不是为着有关Web-Server的编程,而是想已毕一套系统融为一体的api,所以也亟需考虑由服务端主动向客户端发送指令的运用场景。比如一个零售店管理平台的服务端在成就了一点数据更新后需求通告各零售门市客户端下载最新数据。即便Akka-http也提供对websocket协商的扶助,但websocket的网络连接是双向恒久的,适合频仍的问答交互式服务端与客户端的沟通,新闻结构也正如零碎。而大家面临的或者是批次型的汪洋数据库数据调换,只须要不难的服务端单向音信就行了,所以websocket不太对劲,而Akka-http的SSE应该相比符合大家的需求。SSE格局的基本原理是服务端统一集中公布音信,各客户端持久订阅服务端发布的音信并从新闻的情节中筛选出属于自己相应推行的命令,然后举办相应的拍卖。客户端接收SSE是在一个独自的线程里不断开展的,不会潜移默化客户端当前的运算流程。当接受有效的音信后就会调用一个事情功用函数作为后台异步运算职责。

升級到 Delphi 10.2 Tokyo 笔记:

服务端的SSE公布是以Source[ServerSentEvent,NotUsed]来贯彻的。ServerSent伊芙(Eve)nt类型定义如下:

  • 更新 Xcode 8.3 & iOS 10.3 测试:
  • 类型 TULargeInteger 需改用 ULARGE_INTEGER
  • MapView 载图已援救(iOS & Android):
    • 不用改:FMX.Maps.Android.pas
    • 不用改:FMX.Maps.iOS.pas
  • Android 无法载入 GIF
    问题已改进:https://quality.embarcadero.com/browse/RSP-11327

    • 不用改:FMX.Graphics.Android.pas
  • THTTPClient.BeginGet 回传类型改变:

    • // Delphi 10.1.2 Berlin

      var HTTPResponse: IHTTPResponse;
      HTTPResponse:= HTTPClient1.BeginGet(...);
      
      // Delphi 10.2 Tokyo
      var AsyncResult: IAsyncResult;
      AsyncResult := HTTPClient1.BeginGet(...);
      

       

  • 已修正:https://quality.embarcadero.com/browse/RSP-12693 左:Delphi 10.1.2
    Berlin
    右:Delphi 10.2 Tokyo
    图片 2

  • Android
    平台,启动已没有黑屏,此前运用的黑屏处理方法,可以不再须求运用了。
  • 线程绘图测试:Test Multi Thread Bitmap
    http://docwiki.embarcadero.com/RADStudio/Tokyo/en/Multi-Threading_for_TBitmap,_TCanvas,_and_TContext3D

    图片 3

  • 问题:FMX 使用 TPopup :

    • 放一个 TEdit,运行时点入这一个 Edit 没有游标?
    • 放一个 TMemo.ReadOnly = True 会弹出键盘?
  • ID:
    30781

    已修正)

    题材:TMessageManager.DefaultManager.SubscribeToMessage
    没成效了?(实测唯有 Android 平台有题目,此外平台没问题)

    • 按 button1 ,并不会到 test
      图片 4
    • 感谢 swish 提醒,因为:Android 下 OnIdle
      事件不会被正常触发造成的,可以团结调用下 WakeMainThread
      就能触发两次 OnIdle
      图片 5
  • ID:
    30781
     已修正)

    题材:使用 AddObject 造成重影问题(移动平台 Android & iOS
    才有问题,Windows & macOS 测试没问题)

    • 第一步: 使用 Rectangle1.AddObject(SubRectangle);
    • 第二步: 使用 Rectangle2.AddObject(SubRectangle);
    • 再反覆第一步,第二步,造成重影及卡顿?
    • 官方QC:https://quality.embarcadero.com/browse/RSP-17663
    • 图片 6
    • 测试工程:[测试]10.2_重影问题.zip
    • 勘误方法:
      将 FMX.Controls.pas 复制到自己的工程目录下,再修改如下:

      procedure TStyledControl.SetNewScene(AScene: IScene);
      var
        OldScene: IScene;
      begin
        OldScene := FScene;
        inherited SetNewScene(AScene);
        if not (csDestroying in ComponentState) and (OldScene <> AScene) and (not IsUpdating)  then
      {---> KillResourceLink; // 删除代码
      {+++>}NeedStyleLookup; // 加入代码:移动平台重影问题,改回 Delphi 10.1.2 Berlin 代码,暂时修正 by Aone
      end;
      
    • 图片 7

  • 题材:即使改动官方源码 FMX.Types.pas 会造成不可以编译 macOS
    平台(其余平台正常),错误如下:
    图片 8
  • ID:
    30781
     已修正)

    问题:TTabControl 加二页,第一页放 TText,第二页放
    TColorPanel,真机运行后,点第二页,不可以及时突显第二页内容,须求再点一下,才会体现。(Android 有问题 iOS 测试没问题)
    测试APK:[BUG]TabControl切页无法即时突显.apk
    测试工程:[BUG]TabControl切页不可能即时显示.zip 官方QC:https://quality.embarcadero.com/browse/RSP-17738
  • 问题:(Delphi 10.1.2 正常)
    场所一:在 TRectangle 里放一个 TButton(须求安装
    ModalResult=mrOk),点 Button 后将 Rectangle 由主窗 RemoveObject
    后失误
    境况二:如若一个 Frame 里有一个闭馆按钮,按下后关门(使用
    Frame.Parent := nil; 或 RemoveObject )就会出错
    图片 9
    测试工程:[BUG]TestRemoveObjectScene.zip 改良方法:关键问题是在 ModalResult
    = mrOk,只要不设置 Button.ModalResult 就不会出错了(其它也可参考
    10.1.2 的 FMX.Controls.pas 内 TControl.SetNewScene(AScene: IScene)
    函数)
    官方QC:https://quality.embarcadero.com/browse/RSP-17741
  • ID:
    30781
     已修正)

    问题:Android 开启相簿(使用官方例子),闪退?
    法定例子:http://docwiki.embarcadero.com/RADStudio/Tokyo/en/Taking_Pictures_Using_FireMonkey_Interfaces
  • 问题:在 Linux 平台,uses System.Zip 会出错?
    图片 10官方QC:https://quality.embarcadero.com/browse/RSP-17811 解决方案:安装
    sudo apt-get install joe wget
    p7zip-full curl build-essential zlib1g-dev
    libcurl4-gnutls-dev
  • ID:
    30781
     已修正)

    问题:Android 平台运用 TControl.MakeScreenshot
    若有文字,截图后,文字会变成黑块。
/**
 * Representation of a server-sent event. According to the specification, an empty data field designates an event
 * which is to be ignored which is useful for heartbeats.
 *
 * @param data data, may span multiple lines
 * @param eventType optional type, must not contain \n or \r
 * @param id optional id, must not contain \n or \r
 * @param retry optional reconnection delay in milliseconds
 */
final case class ServerSentEvent(
  data:      String,
  eventType: Option[String] = None,
  id:        Option[String] = None,
  retry:     Option[Int]    = None) {...}

 

本条类其他参数代表事件新闻的数据结构。用户可以按照实际需求丰盛利用这些数据结构来传递新闻。服务端是透过complete以SeverSent伊夫nt类为因素的Source来举办SSE的,如下:

参考:

    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
         complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }

以上代码代表服务端定时运算processToServerSent伊夫(Eve)nt再次回到ServerSent伊芙nt类型结果后发布给所有订阅的客户端。大家用一个函数processToServerSent伊芙nt模拟重复运算的工作职能:

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }

以此函数模拟发表事件数量是某种业务运算结果,在此间表示客户端须求下载文件名称。大家用客户端request来模拟设定那么些文件名称:

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

客户端订阅SSE的主意如下:

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

每当客户端收到SSE后即运行downloadFiles(filename)函数。downloadFiles函数定义:

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

上边是客户端程序的测试运算步骤:

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }

运算结果:

do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders

do some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download Items

Try to download Items

Process finished with exit code 0

上边是此次探究的示范源代码:

服务端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEvent

object SSEServer {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()
    Http().bindAndHandle(route, "localhost", 8011)

    scala.io.StdIn.readLine()
    system.terminate()
  }

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

    def events =
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }

    syncRequests ~ events
  }

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }
}

客户端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._

object SSEClient {

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }


    scala.io.StdIn.readLine()
    system.terminate()
  }
}

 

 我的博客即将联合至腾讯云+社区。邀大家一同入驻http://cloud.tencent.com/developer/support-plan