Akka(43): Http:SSE-Server Sent Event – 服务端主推消息

 
 对于.NET的分布式应用开发,可以供我们挑选的技术同框架比较多,例如webservice,.net
remoting,MSMQ,WCF等等技术。对于这些技能很多丁犹未会见生,即时没有深刻的刺探,但是得听说了,每种技术还各发优势以及适用范围,没有绝对的上下,只有相对的适用程度。不过可惜了,今天咱们讲解的主题不是即刻几种植技术,今天紧要教授的是ASP.NET
WebAPI。

 
 因为自了解Akka-http的重中之重目的不是以有关Web-Server的编程,而是想实现同仿系统并的api,所以也急需考虑由劳动端主动往客户端发送指令的行使场景。比如一个零售店管理平台的服务端在形成了几许数据更新后要通知每零售门市客户端下载最新数据。虽然Akka-http也供针对性websocket商事的支持,但websocket的网络连接是双向恒久的,适合频繁的问答交互式服务端与客户端的交流,消息结构也比较零碎。而我们面临的或是是批次型的大气数据库数据交换,只需要简单的服务端单向信息就是执行了,所以websocket不极端合适,而Akka-http的SSE应该比较可我们的求。SSE模式的基本原理是服务端统一集中宣布消息,各客户端持久订阅服务端发布之信并起信息之始末中筛选产生属于自己应有推行之命令,然后开展相应的拍卖。客户端接收SSE是以一个单身的线程里持续拓展的,不见面影响客户端当前之演算流程。当接过有效的消息继即便见面调用一个事务功能函数作为后台异步运算任务。

   对于ASP.NET
WebAPI的优势以及特色,在此处就不讲了,需要采用的当然就是见面选择,也未待自家浪费篇幅去讲授这些,这篇博文主要教授ASP.NET
WebAPI中之HTTP消息之布局和拍卖消息的中心目标。

劳端的SSE发布是坐Source[ServerSentEvent,NotUsed]来促成的。ServerSentEvent类型定义如下:

一.WebAPI的HTTP概述:

 
 有关HTTP协议的连带内容在此处就无做牵线,在作者前面的博文被已召开了介绍,现在提供一下地方,因为过多之废话就是浪费时间,我便聊看这首博文的读者就指向HTTP协议及WebAPI都拥有了解。博文地址:

http://www.cnblogs.com/pengze0902/p/5976388.html

http://www.cnblogs.com/pengze0902/p/6224792.html

http://www.cnblogs.com/pengze0902/p/6230105.html

/**
 * Representation of a server-sent event. According to the specification, an empty data field designates an event
 * which is to be ignored which is useful for heartbeats.
 *
 * @param data data, may span multiple lines
 * @param eventType optional type, must not contain \n or \r
 * @param id optional id, must not contain \n or \r
 * @param retry optional reconnection delay in milliseconds
 */
final case class ServerSentEvent(
  data:      String,
  eventType: Option[String] = None,
  id:        Option[String] = None,
  retry:     Option[Int]    = None) {...}

   1.当.NET4.5前的版中,处理HTTP的基本目标:

     
(1).在客户端:System.Net.HttpWebRequest用于初始化HTTP请求,处理相关的应; System.Net.HttpWebResponse处理HTTP响应头和数量读取的摸索。

     
(2).在服务器端:System.Web.HttpContext,System.Web.HttpRequest,System.Web.HttpResponse类用当ASP.NET上生文中,代表单个请求与应。System.Net.HttpListenerContext类,提供对HTTP请求和应对象的顾。

夫项目的参数代表事件信息之数据结构。用户可以因实际得充分利用这个数据结构来传递信息。服务端是经过complete以SeverSentEvent类为素的Source来展开SSE的,如下:

   2.每当.NET4.5本被,处理HTTP的基本目标:

     
(1).在客户端和服务器端使用同样的接近。(HttpRequestMessage和HttpResponseMessage对象吃无带有上下文消息,所以可以以服务器和客户端共用。)

     
(2).由于当.NET4.5受引入了TAP(异步任务模型),所以当新的HTTP模型中,处理HTTP请求的法可以async和awit实现异步编程。(可以略高效的贯彻异步编程)

   
我们对于新老的HTTP编程模型时,会很爱之发现于初本子的HTTP模型中,无论是编程的难度及代码编写的精简度,已经推行之效率还是老大高之。在对Web项目的出被,我们针对HTTP知识之问询是不可或缺的,对于ASP.NET的HTTP处理的规律在这里就是未开具体的介绍,网上为生比较多之章可供应阅读和了解。

   
对于ASP.NET的HTTP处理方式的刺探,是我当付出微信公众平台时更加学习的,微信公众平台供了对外访问的接口,我们的次第和服务器对微信服务器的接口进行呼吁访问,微信服务器获取HTTP请求后,返回处理结果,本地服务器获取返回结果。这样一个要-响应模式,组成一个会话。对于微信公众平台的开对多刚好学习.NET的人数吧有些高大(当然就是相对而言),即时开发了那个频繁夫类别的次第的丁(调用第三正在接口的开)也未自然好充分鲜明的懂得是里面的规律,笔者以为对于这么的老三正值平台的开支,其利害攸关的着力组成部分就是是于HTTP协议的处理,建立请求、获取响应消息及分析消息立即三怪步子,返回的音信内容一般也json或者xml,获取响应消息继,主要是本着信息内容的反序列化,获得信息之实业信息,进而在程序中尤其处理。

   
在WeAPI中信息的起与剖析,以及信息之格式都是可以动态的开创和情商,下面我们尤其的摸底实现这同过程的中坚目标。

    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
         complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }

二.WebAPI的HTTP消息分析:

     
HTTP协议的干活方法是在客户端和服务器之间交换请求和应消息,那么就为便得说明HTTP的主干就是信息,对于“消息”的刺探,我们若掌握消息分为“消息头部”和“消息内容”,我们接下的指向新HTTP编程模型的介绍的中心虽是“消息头部”和“消息内容”。

     
在命名空间System.Net.Http中,具有两个主导目标:HttpRequestMessage和HttpResponseMessage。两单目标的构造使下图:

图片 1

     
以上要教授了HttpRequestMessage对象以及HttpResponseMessage对象涵盖的要害内容,请求与应消息还可以蕴涵一个可选的消息正文,两蒙受信息类型及消息内容,都可使用应的标头。接下来具体了解部分信息之结构。

以上代码代表劳务端定时运算processToServerSentEvent返回ServerSentEvent类型结果后公布为拥有订阅的客户端。我们就此一个函数processToServerSentEvent模拟重复运算的作业功能:

    1.HttpRequestMessage目标解析:

         (1).HttpRequestMessage主要性能和法概述:

名称 说明
Version 获取或设置 HTTP 消息版本
Content 获取或设置 HTTP 消息的内容
Method 获取或设置 HTTP 请求信息使用的 HTTP 方法
RequestUri 获取或设置 HTTP 请求的 Uri
Headers 获取 HTTP 请求标头的集合
Properties 获取 HTTP 请求的属性集
ToString 返回表示当前对象的字符串

        该目标主要用来表示 HTTP
请求消息。对于拖欠目标的这些性和方,大部分应该都非见面生,因为一个HTTP消息中根本含有头部、消息内容等等,在这里主要介绍一个性质Properties,该属性并无属其他正式的HTTP消息,当消息传时,不见面保留该属性。

         (2).Properties属性解析:

[__DynamicallyInvokable]
public IDictionary<string, object> Properties
{
    [__DynamicallyInvokable]
    get
    {
        if (this.properties == null)
        {
            this.properties = new Dictionary<string, object>();
        }
        return this.properties;
    }
}

   
有以上之代码可以生明显的相该属性只来一个但念属性,并返回一个IDictionary<string,
object>。当消息于服务器或者客户端本地开展拍卖时,该属性用于保存附加的音信信息。该属性只是一个通用的器皿,保存本地信息属性。(与接受信息之总是相关的客户端认证;将信息和布局路由于进行匹配,得到的路由数据)

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }

   2.HttpResponseMessage对象解析:

        (1).HttpRequestMessage主要性能与方法概述:

名称 说明
EnsureSuccessStatusCode 如果 HTTP 响应的 IsSuccessStatusCode 属性为  false, 将引发异常
StatusCode 获取或设置 HTTP 响应的状态代码
ReasonPhrase 获取或设置服务器与状态代码通常一起发送的原因短语
RequestMessage 获取或设置导致此响应消息的请求消息
IsSuccessStatusCode 获取一个值,该值指示 HTTP 响应是否成功

     
对于拖欠目标的一部分性能没有列举,因为于HttpRequestMessage对象已经介绍,如:Version、Content、Headers等,该对象主要用于表示
HTTP 响应消息。在此地要介绍StatusCode属性。

       (2).StatusCode属性:

[__DynamicallyInvokable]
public HttpStatusCode StatusCode
{
    [__DynamicallyInvokable, TargetedPatchingOptOut("Performance critical to inline this type of method across NGen image boundaries")]
    get
    {
        return this.statusCode;
    }
    [__DynamicallyInvokable]
    set
    {
        if ((value < ((HttpStatusCode) 0)) || (value > ((HttpStatusCode) 0x3e7)))
        {
            throw new ArgumentOutOfRangeException("value");
        }
        this.CheckDisposed();
        this.statusCode = value;
    }
}

   
 StatusCode属性为枚举属性,该属性可读而写,对于状态码这个概念,很多口犹是较了解之,在HTTP协议被,状态码主要是意味于信息之求在服务器被拍卖的结果,状态产生2XX,3XX,4XX,5XX等等,具体表示的义就是不再描述。

此函数模拟发布事件数量是某种业务运算结果,在此代表客户端需要下载文件名称。我们所以客户端request来法设定是文件名称:

     3.HTTP型消息标头解析:

         
在HTTP中,请求与应消息,以及消息内容本身,都得使称为标头的额外字段,包含重复多之音。

       (1).标头分类:

标头名称 描述 HTTP模型标头容器类
User-Agent 为请求提供扩展信息,描述产生这个请求的应用程序 HttpRequestHeaders
Server 为响应提供关于源服务器软件的扩展信息 HttpResponseHeaders
Content-Type 定义请求或响应有效载荷正文中,资源表示使用的媒体类型 HttpContentHeaders

       (2).HttpHeaders抽象类分析:

名称 描述
Add 添加指定的标头及其值到 HttpHeaders 集合中。
TryAddWithoutValidation 返回一个值,该值指示指定标头及其值是否已添加到HttpHeaders 集合,而未验证所提供的信息。
Clear 从 HttpHeaders 集合中移除所有标头。
Remove 从HttpHeaders集合中移除指定的标头。
GetValues 返回存储在HttpHeaders 集合中所有指定标头的标头值。
Contains 如果指定标头存在于 HttpHeaders 集合则返回。
ToString 返回表示当前 HttpHeaders对象的字符串。

     
 HttpHeaders是一个抽象类,HttpRequestHeaders、HttpResponseHeaders、HttpContentHeaders三个像样继承了此类。接下来我们来打探一下Add()方法:

[__DynamicallyInvokable]
public void Add(string name, string value)
{
    HeaderStoreItemInfo info;
    bool flag;
    this.CheckHeaderName(name);
    this.PrepareHeaderInfoForAdd(name, out info, out flag);
    this.ParseAndAddValue(name, info, value);
    if (flag && (info.ParsedValue != null))
    {
        this.AddHeaderToStore(name, info);
    }
}

     
 Add()方法有两独重载版本,该方式可以为容器添加标头,如果一旦抬高的标头有标准名,在丰富之前标头值会开展认证。Add方法还会见证明标头是否足以生出差不多只价。

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

   4.HTTP消息内容分析:

     
在.NET4.5本的HTTP模型中,HTTP消息之正文由抽象基类HttpContent表示,HttpResponseMessage和HttpRequestMessage对象都带有一个HttpContent类型的Content属性。

     (1).HttpContent主要性能和道:

名称 描述
ReadAsByteArrayAsync 以异步操作将 HTTP 内容写入字节数组。
SerializeToStreamAsync 以异步操作将 HTTP 内容序列化到流。
CopyToAsync 以异步操作将 HTTP 内容写入流。
LoadIntoBufferAsync 以异步操作将 HTTP 内容序列化到内存缓冲区。
CreateContentReadStreamAsync 以异步操作将 HTTP 内容写入内存流。
TryComputeLength 确定 HTTP 内容是否具备有效的字节长度。
Headers 根据 RFC 2616 中的定义,获取内容标头。

     (2).CopyToAsync()方法分析:

[__DynamicallyInvokable]
public Task CopyToAsync(Stream stream, TransportContext context)
{
    Action<Task> continuation = null;
    this.CheckDisposed();
    if (stream == null)
    {
        throw new ArgumentNullException("stream");
    }
    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
    try
    {
        Task task = null;
        if (this.IsBuffered)
        {
            task = Task.Factory.FromAsync<byte[], int, int>(new Func<byte[], int, int, 
            AsyncCallback, object, IAsyncResult>(stream.BeginWrite), new Action<IAsyncResult>(stream.EndWrite), 
       this.bufferedContent.GetBuffer(), 0, (int) this.bufferedContent.Length, null);
        }
        else
        {
            task = this.SerializeToStreamAsync(stream, context);
            this.CheckTaskNotNull(task);
        }
        if (continuation == null)
        {
            continuation = delegate (Task copyTask) {
                if (copyTask.IsFaulted)
                {
                    tcs.TrySetException(GetStreamCopyException(copyTask.Exception.GetBaseException()));
                }
                else if (copyTask.IsCanceled)
                {
                    tcs.TrySetCanceled();
                }
                else
                {
                    tcs.TrySetResult(null);
                }
            };
        }
        task.ContinueWithStandard(continuation);
    }
    catch (IOException exception)
    {
        tcs.TrySetException(GetStreamCopyException(exception));
    }
    catch (ObjectDisposedException exception2)
    {
        tcs.TrySetException(GetStreamCopyException(exception2));
    }
    return tcs.Task;
}

   
在采取信息内容常常,需要动用HtppContent的法门要扩展方法。在HttpContent中动用CopyToAsync()方法以推送方式访原本的信息内容,由艺术代码可以看,该办法接受两个参数,一个凡是流对象,一个凡有关传输的消息(例如,通道绑定),此参数可以呢
null。该法可将信内容写副到这流中。

    在拖欠措施的实现代码中
创建了一个TaskCompletionSource<object>的泛型对象,该目标表示不绑定到委托的 Task<TResult> 的制造者方,并由此 Task 属性提供针对性使用者在的走访。SerializeToStreamAsync方法以盛传的流对象序列化,该措施为异步方法。

   
我们需要留意的几沾,主要为委托的创建及使用,在C#惨遭,尽量用有.NET提供的委托类,不要自己失去创造。还有一些即便是于先后中针对老的处理方式,异常的破获具有层次性,并且调用了于定义之一个要命处理措施TrySetException。

    (2).ReadAsStreamAsync()方法分析:

     
在博旧消息内容经常,除了调用上面介绍的法子外,还得调用ReadAsStreamAsync()方法以拉取的主意访原本之音内容。

     
在HttpContent中涵盖有另外两独八九不离十的计,ReadAsStringAsync()和ReadAsByteArrayAsync()异步的提供信息内容之缓冲副本,ReadAsByteArrayAsync()返回原的字节内容,ReadAsStringAsync()将内容解码为字符串返回。

客户端订阅SSE的办法如下:

三.DotNet中初老HTTP模型解析:

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

   1..NET4.5事先版本创建HTTP POST请求实例:

        public static string HttpPost(string postUrl, string postData)
        {
            if (string.IsNullOrEmpty(postUrl))
                throw new ArgumentNullException(postUrl);
            if (string.IsNullOrEmpty(postData))
                throw new ArgumentNullException(postData);
            var request = WebRequest.Create(postUrl) as HttpWebRequest;
            if (request == null)
                throw new ArgumentNullException("postUrl");
            try
            {
                var cookieContainer = new CookieContainer();
                request.CookieContainer = cookieContainer;
                request.AllowAutoRedirect = true;
                request.Method = "POST";
                request.ContentType = "application/x-www-form-urlencoded";
                var data = Encoding.UTF8.GetBytes(postData);
                request.ContentLength = data.Length;
                var outstream = request.GetRequestStream();
                outstream.Write(data, 0, data.Length);
                outstream.Close();
                //发送请求并获取相应回应数据,获取对应HTTP请求的响应
                var response = request.GetResponse() as HttpWebResponse;
                if (response != null)
                {
                    var instream = response.GetResponseStream();
                    var content = string.Empty;
                    if (instream == null)
                    {
                        return content;
                    }
                    using (var sr = new StreamReader(instream, Encoding.UTF8))
                    {
                        content = sr.ReadToEnd();
                    }
                    return content;
                }
            }
            catch (ArgumentException arex)
            {
                throw arex;
            }
            catch (IOException ioex)
            {
                throw ioex;
            }
            return null;
        }

于客户端收到SSE后虽运行downloadFiles(filename)函数。downloadFiles函数定义:

   2..NET4.5本子创建HTTP POST请求实例:

async static void getResponse(string url)
        {
            using (HttpClient client = new HttpClient())
            {
                using (HttpResponseMessage response = await client.GetAsync(url))
                {
                    using (HttpContent content = response.Content)
                    {
                        string myContent = await content.ReadAsStringAsync();
                    }
                }
            }
        }
        async static void postResponse(string url)
        {
            while (true)
            {
                IEnumerable<KeyValuePair<string, string>> queries = new List<KeyValuePair<string, string>>()
            {
                new KeyValuePair<string, string> ("test","test")
            };
                HttpContent q = new FormUrlEncodedContent(queries);
                using (HttpClient client = new HttpClient())
                {
                    using (HttpResponseMessage response = await client.PostAsync(url, q))
                    {
                        using (HttpContent content = response.Content)
                        {
                            string myContent = await content.ReadAsStringAsync();

                            Console.WriteLine(myContent);
                        }
                    }
                }
            }
        }
  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

四.总结:

 
 以上要教学了.NET4.5事先和今后版本对HTTP编程模式之片情节, 两者的显要区别在于.NET4.5版之前的HTTP编程模型会区分客户端和服务器,两者采用的靶子有不同,实现的法则及虽是必然的相似性,但是以的类似可不比。.NET4.5下的本中,对象的用没有客户端和服务器的分,两者可以共用。

下是客户端程序的测试运算步骤:

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }

运算结果:

do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders

do some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download Items

Try to download Items

Process finished with exit code 0

脚是本次讨论的示范源代码:

服务端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEvent

object SSEServer {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()
    Http().bindAndHandle(route, "localhost", 8011)

    scala.io.StdIn.readLine()
    system.terminate()
  }

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

    def events =
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }

    syncRequests ~ events
  }

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }
}

客户端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._

object SSEClient {

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }


    scala.io.StdIn.readLine()
    system.terminate()
  }
}

 

 我的博客即将同到腾讯云+社区。邀大家一同入驻http://cloud.tencent.com/developer/support-plan