葡京娱乐总站平台SDP(0):Streaming-Data-Processor – Data Processing with Akka-Stream

Source是一段程序的发端有的。一般的话Source是经过运算Query爆发一串数据行或者人工构建而成。Source也得以相互运算Query发生,然后合并成一条无序的数据源,如下伪代码的品种:

   1.在.NET4.5事先的版本中,处理HTTP的中央目的:

     
(1).在客户端:System.Net.HttpWebRequest用于起先化HTTP请求,处理有关的响应; System.Net.HttpWebResponse处理HTTP响应头和数据读取的物色。

     
(2).在劳动器端:System.Web.HttpContext,System.Web.HttpRequest,System.Web.HttpResponse类用在ASP.NET上下文中,代表单个请求和响应。System.Net.HttpListenerContext类,提供对HTTP请求和响应对象的拜访。

  def load_par(qrys: Query*): PRG[R,M] = ???

   2.在.NET4.5本子中,处理HTTP的骨干目标:

     
(1).在客户端和劳务器端使用相同的类。(HttpRequestMessage和HttpResponseMessage对象中不含有上下文消息,所以可以在服务器和客户端共用。)

     
(2).由于在.NET4.5中引入了TAP(异步任务模型),所以在新的HTTP模型中,处理HTTP请求的艺术可以应用async和awit实现异步编程。(可以简单高效的贯彻异步编程)

   
我们对此新旧的HTTP编程模型时,会很容易的发现在新本子的HTTP模型中,无论是编程的难度和代码编写的精简度,已经履行的效用都是很高的。在对于Web项目标开支中,大家对HTTP知识的明白是必备的,对于ASP.NET的HTTP处理的规律在此间就不做实际的介绍,网上也有相比较多的篇章可供阅读和询问。

   
对于ASP.NET的HTTP处理情势的精通,是自身在付出微信公众平台时更加深造的,微信公众平台提供了对外访问的接口,我们的先后和服务器对微信服务器的接口举行呼吁访问,微信服务器获取HTTP请求后,再次回到处理结果,本地服务器获取再次来到结果。这样一个呼吁-响应形式,组成一个对话。对于微信公众平台的付出对于许多刚学习.NET的人来说有些高大(当然这是相对而言),即时开发过很频繁以此项目标次序的人(调用第三方接口的支出)也不自然可以很显著的接头这些里面的原理,笔者认为对于这样的第三方平台的开支,其重点的中坚部分就是对此HTTP协议的处理,建立请求、获取响应信息和分析信息那三大步子,重返的信息内容一般为json或者xml,获取响应消息后,紧假若对音信内容的反体系化,得到新闻的实业音信,进而在程序中越发处理。

   
在WeAPI中音信的发生和分析,以及音讯的格式都是可以动态的创导和协和,下边我们更加的问询实现这一过程的主导目的。

 
 我把一般中小公司的IT系统分成两大片段:一是实时的数目收集(输入)部分,二是批量数据抽取、分析、处理部分。为了让传统中小型公司IT软件编程人士能开发服务器集群环境上数据平台(如云端数据平台)运行的软件系统,我打算通过这么些DSP(Streaming-Data-Processor)项目来兑现地点提到的第二有的。第一有的可以用CQRS(Command-Query-Responsibility-Separation)即读写分离架构和事件记录(event-sourcing)模式来促成一种高效飞快响应、安全稳定运转的多少搜集系统。这有些我会在成功SDP项目后以akka-persistence为着力,通过akka-http,AMQP如RabitMQ等技巧来落实。

   4.HTTP音信内容分析:

     
在.NET4.5本子的HTTP模型中,HTTP音信的正文由抽象基类HttpContent表示,HttpResponseMessage和HttpRequestMessage对象都带有一个HttpContent类型的Content属性。

     (1).HttpContent重要性能和措施:

名称 描述
ReadAsByteArrayAsync 以异步操作将 HTTP 内容写入字节数组。
SerializeToStreamAsync 以异步操作将 HTTP 内容序列化到流。
CopyToAsync 以异步操作将 HTTP 内容写入流。
LoadIntoBufferAsync 以异步操作将 HTTP 内容序列化到内存缓冲区。
CreateContentReadStreamAsync 以异步操作将 HTTP 内容写入内存流。
TryComputeLength 确定 HTTP 内容是否具备有效的字节长度。
Headers 根据 RFC 2616 中的定义,获取内容标头。

     (2).CopyToAsync()方法分析:

[__DynamicallyInvokable]
public Task CopyToAsync(Stream stream, TransportContext context)
{
    Action<Task> continuation = null;
    this.CheckDisposed();
    if (stream == null)
    {
        throw new ArgumentNullException("stream");
    }
    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
    try
    {
        Task task = null;
        if (this.IsBuffered)
        {
            task = Task.Factory.FromAsync<byte[], int, int>(new Func<byte[], int, int, 
            AsyncCallback, object, IAsyncResult>(stream.BeginWrite), new Action<IAsyncResult>(stream.EndWrite), 
       this.bufferedContent.GetBuffer(), 0, (int) this.bufferedContent.Length, null);
        }
        else
        {
            task = this.SerializeToStreamAsync(stream, context);
            this.CheckTaskNotNull(task);
        }
        if (continuation == null)
        {
            continuation = delegate (Task copyTask) {
                if (copyTask.IsFaulted)
                {
                    tcs.TrySetException(GetStreamCopyException(copyTask.Exception.GetBaseException()));
                }
                else if (copyTask.IsCanceled)
                {
                    tcs.TrySetCanceled();
                }
                else
                {
                    tcs.TrySetResult(null);
                }
            };
        }
        task.ContinueWithStandard(continuation);
    }
    catch (IOException exception)
    {
        tcs.TrySetException(GetStreamCopyException(exception));
    }
    catch (ObjectDisposedException exception2)
    {
        tcs.TrySetException(GetStreamCopyException(exception2));
    }
    return tcs.Task;
}

   
在使用信息内容时,需要选择HtppContent的章程如故扩展方法。在HttpContent中利用CopyToAsync()方法以推送格局访问原本的信息内容,由艺术代码可以观望,该方法接受两个参数,一个是流对象,一个是关于传输的音信(例如,通道绑定),此参数可以为
null。该模式可以把信息内容写入到这么些流中。

    在该办法的兑现代码中
创立了一个TaskCompletionSource<object>的泛型对象,该目的表示未绑定到委托的 Task<TResult> 的创制者方,并通过 Task 属性提供对使用者方的拜会。SerializeToStreamAsync方法将盛传的流对象连串化,该措施为异步方法。

   
我们需要留意的几点,重要为委托的创导和动用,在C#中,尽量采用有.NET提供的委托类,不要自己去创制。还有某些就是在程序中对分外的处理形式,相当的捕获具有层次性,并且调用了自定义的一个那些处理形式TrySetException。

    (2).ReadAsStreamAsync()方法分析:

     
在取得原始音讯内容时,除了调用下面介绍的法子外,还是可以调用ReadAsStreamAsync()方法以拉取的办法访问原本的信息内容。

     
在HttpContent中富含有其余六个像样的艺术,ReadAsStringAsync()和ReadAsByteArrayAsync()异步的提供新闻内容的缓冲副本,ReadAsByteArrayAsync()再次来到原始的字节内容,ReadAsStringAsync()将内容解码为字符串再次来到。

Process-Node是SDP最首要的一个组成部分,因为多数用户定义的各类业务职能是在此地运算的。用户可以选拔对业务效能拓展拆分然后分担给不同的线程或不同的集群节点举行多线程并行或分布式的演算。SDP应该为用户程序提供多线程,并行式、分布式的运算函数。首先,运算用户程序后应爆发R类型结果还要,作为一种reactive软件,必须保证完全消耗上一阶段发生的有着R类型元素。下边是一个用户函数的格局:

二.WebAPI的HTTP信息分析:

     
HTTP协议的干活形式是在客户端和服务器之间交流请求和响应信息,那么这也就足以证实HTTP的为主就是音信,对于“音信”的摸底,大家假诺知道信息分为“音讯头部”和“音讯内容”,我们接下去的对新HTTP编程模型的牵线的中央就是“音信头部”和“音讯内容”。

     
在命名空间System.Net.Http中,具有三个主导目的:HttpRequestMessage和HttpResponseMessage。五个目标的构造如下图:

葡京娱乐总站平台 1

     
以上首要讲解了HttpRequestMessage对象和HttpResponseMessage对象涵盖的要害内容,请求和响应消息都可以蕴涵一个可选的音讯正文,两中新闻类型以及信息内容,都得以运用响应的标头。接下来具体了解部分音信的构造。

 

四.总结:

 
 以上重点教学了.NET4.5事先和今后版本对HTTP编程情势的片段内容, 两者的要紧区别在于.NET4.5本子此前的HTTP编程模型会区分客户端和服务器,两者接纳的对象存在不同,实现的法则上尽管存在必然的相似性,不过接纳的类却不同。.NET4.5之后的本子中,对象的利用没有客户端和服务器之分,两者可以共用。

从地点的以身作则中我们可以看出所有定义的函数都暴发PRG[R,M]项目结果。其中R类型就是stream的要素,它流动贯穿了先后的兼具环节。就像下水道网络运行规律一样:污水由源头Source流入终点Sink,在旅途可能因此五个污水处理节点Node。每一个节点代表对管道中流动污水处理的主意,包括分叉引流、并叉合流、添加化学物质、最终通过终点把拍卖过的水向外输出。在PRG中流淌的R类型可能是数据如数据库表的一行,又或者是一条Sring类型的query如plain-sql,可以用JDBC来运作。cassandra的CQL也是String类型的。Slick,Quill,ScalikeJDBC和局部其余ORM的Query都得以发生plain-sql。

   2.HttpResponseMessage目的解析:

        (1).HttpRequestMessage重要性能和措施概述:

名称 说明
EnsureSuccessStatusCode 如果 HTTP 响应的 IsSuccessStatusCode 属性为  false, 将引发异常
StatusCode 获取或设置 HTTP 响应的状态代码
ReasonPhrase 获取或设置服务器与状态代码通常一起发送的原因短语
RequestMessage 获取或设置导致此响应消息的请求消息
IsSuccessStatusCode 获取一个值,该值指示 HTTP 响应是否成功

     
对于该对象的局部性能没有列举,因为在HttpRequestMessage对象已经介绍,如:Version、Content、Headers等,该目的首要用于表示
HTTP 响应信息。在这里关键介绍StatusCode属性。

       (2).StatusCode属性:

[__DynamicallyInvokable]
public HttpStatusCode StatusCode
{
    [__DynamicallyInvokable, TargetedPatchingOptOut("Performance critical to inline this type of method across NGen image boundaries")]
    get
    {
        return this.statusCode;
    }
    [__DynamicallyInvokable]
    set
    {
        if ((value < ((HttpStatusCode) 0)) || (value > ((HttpStatusCode) 0x3e7)))
        {
            throw new ArgumentOutOfRangeException("value");
        }
        this.CheckDisposed();
        this.statusCode = value;
    }
}

   
 StatusCode属性为枚举属性,该属性可读可写,对于状态码这一个定义,很五人都是相比精晓的,在HTTP协议中,状态码紧倘诺象征在音信的呼吁在服务器中拍卖的结果,状态有2XX,3XX,4XX,5XX等等,具体表示的含义就不再描述。

Sink的紧要功用实际上是承保完全消耗程序中发生的拥有因素,这是reactive类型程序的总得要求。

 
 对于.NET的分布式应用开发,可以供我们接纳的技能和框架相比较多,例如webservice,.net
remoting,MSMQ,WCF等等技术。对于这些技巧很四人都不会陌生,即时没有深入的问询,但是毫无疑问听说过,每种技术都各有优势和适用范围,没有断然的好坏,只有相对的贴切程度。不过可惜了,前几日我们讲解的核心不是这两种技术,前几天重大讲解的是ASP.NET
WebAPI。

  def load(qry: Query): PRG[R,M] = ???
  def process1: PRG[R,M] = ???
  def process2: PRG[R,M] = ???
  def recursiveProcess(prg: PRG[R,M]): PRG[R,M] = ???
  def results: PRG = ???

  load(qryOrders).process1.process2.recursiveProcess(subprogram).results.run

一.WebAPI的HTTP概述:

 
 有关HTTP协议的连带内容在此间就不做牵线,在笔者前面的博文中已经做过介绍,现在提供一下地方,因为过多的废话就是浪费时间,我就姑且看这篇博文的读者已经对HTTP协议和WebAPI都怀有了然。博文地址:

http://www.cnblogs.com/pengze0902/p/5976388.html

http://www.cnblogs.com/pengze0902/p/6224792.html

http://www.cnblogs.com/pengze0902/p/6230105.html

 

     3.HTTP模子信息标头解析:

         
在HTTP中,请求和响应音讯,以及音讯内容本身,都可以应用称为标头的附加字段,包含更多的信息。

       (1).标头分类:

标头名称 描述 HTTP模型标头容器类
User-Agent 为请求提供扩展信息,描述产生这个请求的应用程序 HttpRequestHeaders
Server 为响应提供关于源服务器软件的扩展信息 HttpResponseHeaders
Content-Type 定义请求或响应有效载荷正文中,资源表示使用的媒体类型 HttpContentHeaders

       (2).HttpHeaders抽象类分析:

名称 描述
Add 添加指定的标头及其值到 HttpHeaders 集合中。
TryAddWithoutValidation 返回一个值,该值指示指定标头及其值是否已添加到HttpHeaders 集合,而未验证所提供的信息。
Clear 从 HttpHeaders 集合中移除所有标头。
Remove 从HttpHeaders集合中移除指定的标头。
GetValues 返回存储在HttpHeaders 集合中所有指定标头的标头值。
Contains 如果指定标头存在于 HttpHeaders 集合则返回。
ToString 返回表示当前 HttpHeaders对象的字符串。

     
 HttpHeaders是一个抽象类,HttpRequestHeaders、HttpResponseHeaders、HttpContentHeaders五个类继承了此类。接下来我们来精通一下Add()方法:

[__DynamicallyInvokable]
public void Add(string name, string value)
{
    HeaderStoreItemInfo info;
    bool flag;
    this.CheckHeaderName(name);
    this.PrepareHeaderInfoForAdd(name, out info, out flag);
    this.ParseAndAddValue(name, info, value);
    if (flag && (info.ParsedValue != null))
    {
        this.AddHeaderToStore(name, info);
    }
}

     
 Add()方法具有六个重载版本,该格局可以向容器添加标头,假诺要充足的标头有标准名,在添加前边标头值会进展表明。Add方法还会注解标头是否可以有多少个值。

 
 再有两天就进去2018了,想想如故要未雨绸缪一下过年的办事主旋律。回忆当年上顿时学函数式编程时的机要目标是想设计一套标准API給这个习惯了OOP情势支付商业使用软件的程序员们,使她们能用一种恍若传统数据库软件编程的法子来落实多线程,并行运算,分布式的多少处理应用程序,前提是这种编程格局不需要对函数式编程语言、多线程软件编程以及集群环境下的分布式软件编程模式有很高的阅历要求。前边试着发表了一个遵照scalaz-stream-fs2的数量处理工具开源项目。该项目为主实现了多线程的数据库数据并行处理,能充分利用域内服务器的多核CPU环境以streaming,non-blocking形式增强数据处理效能。近来刚落成了对整个akka套装(suite)的问询,感觉akka是一套精美的分布式编程工具:一是actor形式提供了多种多线程编程格局,再不怕akka-cluster能轻松地落实集群式的分布式编程,而集群环境变迁只需要调整安排文件,无需改变代码。akka-stream是一套功用进一步完整和有力的streaming工具库,那么只要以akka-stream为根基,设计一套能在集群环境里举行分布式多线程并行数据处理的开源编程工具应该可以是2018的重要任务。同样,用户仍是可以够按照他们深谙的数据库应用编程格局轻松实现分布式多线程并行数据处理程序的开支。

三.DotNet中新旧HTTP模型分析:

 

   1..NET4.5往日版本成立HTTP POST请求实例:

        public static string HttpPost(string postUrl, string postData)
        {
            if (string.IsNullOrEmpty(postUrl))
                throw new ArgumentNullException(postUrl);
            if (string.IsNullOrEmpty(postData))
                throw new ArgumentNullException(postData);
            var request = WebRequest.Create(postUrl) as HttpWebRequest;
            if (request == null)
                throw new ArgumentNullException("postUrl");
            try
            {
                var cookieContainer = new CookieContainer();
                request.CookieContainer = cookieContainer;
                request.AllowAutoRedirect = true;
                request.Method = "POST";
                request.ContentType = "application/x-www-form-urlencoded";
                var data = Encoding.UTF8.GetBytes(postData);
                request.ContentLength = data.Length;
                var outstream = request.GetRequestStream();
                outstream.Write(data, 0, data.Length);
                outstream.Close();
                //发送请求并获取相应回应数据,获取对应HTTP请求的响应
                var response = request.GetResponse() as HttpWebResponse;
                if (response != null)
                {
                    var instream = response.GetResponseStream();
                    var content = string.Empty;
                    if (instream == null)
                    {
                        return content;
                    }
                    using (var sr = new StreamReader(instream, Encoding.UTF8))
                    {
                        content = sr.ReadToEnd();
                    }
                    return content;
                }
            }
            catch (ArgumentException arex)
            {
                throw arex;
            }
            catch (IOException ioex)
            {
                throw ioex;
            }
            return null;
        }

 
按一般的scala和akka的编程情势编写多线程分布式数据库管理软件时一是要服从akka代码格局,使用scala编程语言的局部较深的语法;二是需要涉及异步Async调用,集群Cluster节点任务布置及Streaming对外集成actor运算格局的底细,用户需要有所一定的scala,akka使用经验。再接下来就需要按业务流程把各业务环节分解成不借助于顺序的功效模块,然后把这多少个分拆出来的效劳分派给集群中不同的节点上去运算处理。而对于SDP用户来说,具备最基本的scala知识,无需明白akka、actor、threads、cluster,只要遵照SDP自定义的作业处理流形式就足以编写多线程分布式数据处理程序了。下面我就用部分文字及伪代码来叙述一下SDP的结构和法力:

   对于ASP.NET
WebAPI的优势和特点,在此间就不讲了,需要动用的自然就会拔取,也不需要自己浪费篇幅去上课这么些,这篇博文首要讲解ASP.NET
WebAPI中的HTTP信息的构造和处理信息的中坚目的。

 

    1.HttpRequestMessage目的解析:

         (1).HttpRequestMessage紧要性能和情势概述:

名称 说明
Version 获取或设置 HTTP 消息版本
Content 获取或设置 HTTP 消息的内容
Method 获取或设置 HTTP 请求信息使用的 HTTP 方法
RequestUri 获取或设置 HTTP 请求的 Uri
Headers 获取 HTTP 请求标头的集合
Properties 获取 HTTP 请求的属性集
ToString 返回表示当前对象的字符串

        该对象紧要用来表示 HTTP
请求音信。对于该目的的这一个属性和格局,大部分应该都不会陌生,因为一个HTTP信息中重大含有头部、音信内容等等,在这里关键介绍一个属性Properties,该属性并不属于此外正规的HTTP音信,当信息传输时,不会保留该属性。

         (2).Properties属性解析:

[__DynamicallyInvokable]
public IDictionary<string, object> Properties
{
    [__DynamicallyInvokable]
    get
    {
        if (this.properties == null)
        {
            this.properties = new Dictionary<string, object>();
        }
        return this.properties;
    }
}

   
有以上的代码可以很分明的观看该属性唯有一个只读属性,并回到一个IDictionary<string,
object>。当音讯在服务器或者客户端本地开展拍卖时,该属性用于保存附加的音信音信。该属性只是一个通用的器皿,保存本地音讯属性。(与接受信息的连接相关的客户端认证;将音讯与部署路由举行匹配,得到的路由数据)

除开fire-and-run类型的演算函数,SDP还应该提供针对性多线程或分布式程序的map-reduce式运算函数。起先想法是:无论再次来到结果与否,分派任务都是由persistence-actor来实施的,这样能担保不会挂一漏万任何任务。假设完全任务急需在富有分派任务再次回到运算结果后再统一开展深度运算时akka的actor音讯使得情势是最符合可是的了。具体情况可以参见我眼前关于cluster-sharding的博文。

   2..NET4.5本子成立HTTP POST请求实例:

async static void getResponse(string url)
        {
            using (HttpClient client = new HttpClient())
            {
                using (HttpResponseMessage response = await client.GetAsync(url))
                {
                    using (HttpContent content = response.Content)
                    {
                        string myContent = await content.ReadAsStringAsync();
                    }
                }
            }
        }
        async static void postResponse(string url)
        {
            while (true)
            {
                IEnumerable<KeyValuePair<string, string>> queries = new List<KeyValuePair<string, string>>()
            {
                new KeyValuePair<string, string> ("test","test")
            };
                HttpContent q = new FormUrlEncodedContent(queries);
                using (HttpClient client = new HttpClient())
                {
                    using (HttpResponseMessage response = await client.PostAsync(url, q))
                    {
                        using (HttpContent content = response.Content)
                        {
                            string myContent = await content.ReadAsStringAsync();

                            Console.WriteLine(myContent);
                        }
                    }
                }
            }
        }

完全来说SDP是由一或五个Stream组成的;每个Stream就表示一段程序。一段完整的顺序Stream是由流元素源Source、处理节点Process-Node(Flow)及数码输出终点Sink多少个环节组成,下边是一个名列前茅的主次框架:

 

  type UserFunc = R => R 

好了,不知不觉还有多少个钟头就进来2017倒计时了。急忙凑合着在跨入2018在此之前把这篇发表出去,刚好是2019年的结尾一篇博文。祝各位在新的一年中行事生活顺利!