百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

基于HTTP2/3的流模式消息交换如何实现?

myzbx 2025-01-12 16:54 17 浏览

我想很多人已经体验过GRPC提供的三种流式消息交换(Client Stream、Server Stream和Duplex Stream)模式,在.NET Core上构建的GRPC应用本质上是采用HTTP2/HTTP3协议的ASP.NET Core应用,我们当然也可以在一个普通的ASP.NET Core应用实现这些流模式。不仅如此,HttpClient也提供了响应的支持,这篇文章通过一个简单的实例提供了相应的实现,源代码从这里下载。

一、双向流的效果
二、[服务端]流式请求/响应的读写
三、[客户端]流式响应/请求的读写

一、双向流的效果

在提供具体实现之前,我们不妨先来演示一下最终的效果。我们通过下面这段代码构建了一个简单的ASP.NET Core应用,如代码片段所示,在调用WebApplication的静态方法CreateBuilder将WebApplicationBuilder创建出来后,我们调用其扩展方法UseKestrel将默认终结点的监听协议设置为Http1AndHttp2AndHttp3,这样我们的应用将提供针对不同HTTP协议的全面支持。

var url = "http://localhost:9999";
var builder = WebApplication.CreateBuilder(args);
builder.WebHost
.UseKestrel(kestrel=> kestrel.ConfigureEndpointDefaults(
listen=>listen.Protocols = HttpProtocols.Http1AndHttp2AndHttp3))
.UseUrls(url);
var app = builder.Build();
app.MapPost("/", httpContext=> HandleRequestAsync(
httpContext,
async (request, writer) => {
Console.WriteLine(
$"[Server]Receive request message: {request}");
await writer.WriteStringAsync(request);
}));
await app.StartAsync();

await SendStreamRequestAsync(
url,
["foo", "bar", "baz", "qux"],
reply => {
Console.WriteLine(
$"[Client]Receive reply message: {reply}\n");
return Task.CompletedTask;
});

我们针对根路径(/)注册了一个HTTP方法为POST的路由终结点,终结点处理器调用HanleRequestAsync来处理请求。这个方法提供一个Func<string, PipeWriter, Task>类型的参数作为处理器,该委托的第一个参数表示接收到的单条请求消息,PipeWriter用来写入响应内容。在这里我们将接收到的消息进行简单格式化后将其输出到控制台上,随之将其作为响应内容进行回写。

在应用启动之后,我们调用SendStreamRequestAsync方法以流的方式发送请求,并处理接收到的响应内容。该方法的第一个参数为请求发送的目标URL,第二个参数是一个字符串数组,我们将以流的方式逐个发送每个字符串。最后的参数是一个Func<string,Task>类型的委托,用来处理接收到的响应内容(字符串),在这里我们依然是将格式化的响应内容直接打印在控制台上。

程序启动后控制台上将出现如上图所示的输出,客户端/服务端接收内容的交错输出体现了我们希望的“双向流式”消息交换模式。我们将在后续介绍HanleRequestAsync和SendStreamRequestAsync方法的实现逻辑。

二、[服务端]流式请求/响应的读写

HanleRequestAsync方法定义如下。如代码片段所示,我们利用请求的BodyReader和响应的BodyWriter来对请求和响应内容进行读写,它们的类型分别是PipeReader和PipeWriter。在一个循环中,在利用BodyReader将请求缓冲区内容读取出来后,我们将得到的ReadOnlySequence<byte>对象作为参数调用辅助方法TryReadMessage读取单条请求消息,并调用handler参数表示的处理器进行处理。当请求内容接收完毕后,循环终止。

static async Task HandleRequestAsync(
HttpContext httpContext,
Func<string, PipeWriter, Task> handler
)
{
var reader = httpContext.Request.BodyReader;
var writer = httpContext.Response.BodyWriter;
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
while (TryReadMessage(ref buffer, out var message))
{
await handler(message, writer);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
}

由于客户端发送的单条字符串消息长度不限,为了精准地将其读出来,我们需要在输出编码后的消息内容前添加4个字节的整数来表示消息的长度。所以在如下所示的TryReadMessage方法中,我们会先将字节长度读取出来,再据此将消息自身内容读取出来,最终通过解码得到消息字符串。

static bool TryReadMessage(
ref ReadOnlySequence<byte> buffer,
[NotWhen(true
)]out string? message)
{
var reader = new SequenceReader<byte>(buffer);
if (!reader.TryReadLittleEndian(out int length))
{
message = default;
return false;
}

message = Encoding.UTF8
.GetString(buffer.Slice(4, length));
buffer = buffer.Slice(length + 4);
return true;
}

响应消息的写入是通过如下针对PipeWriter的WriteStringAsync扩展方法实现的,这里的PipeWriter就是响应的BodyWriter,针对“Length + Payload“的消息写入也体现在这里。

public static class Extensions
{
public static ValueTask<FlushResult> WriteStringAsync(
this PipeWriter writer, string content
)
{
var length = Encoding.UTF8.GetByteCount(content);
var span = writer.GetSpan(4 + length);
BitConverter.TryWriteBytes(span, length);
Encoding.UTF8.GetBytes(content, span.Slice(4));
writer.Advance(4 + length);
return writer.FlushAsync();
}
}

三、[客户端]流式响应/请求的读写

客户端利用HttpClient发送请求。针对HttpClient的请求通过一个HttpRequestMessage对象表示,其主体内容体现为一个HttpContent。流式请求的发送是通过如下这个StreamContent类型实现的,它派生于HttpContent。我们重写了SerializeToStreamAsync方法,利用自定义的StreamContentWriter将内容写入请求输出流。

public class StreamContent(StreamContentWriter writer) : HttpContent
{
private readonly StreamContentWriter _writer = writer;
protected override Task SerializeToStreamAsync(
Stream stream,
TransportContext? context
)
=> _writer.SetOutputStream(stream).WaitAsync();
protected override bool TryComputeLength(out long length)
=> (length = -1) != -1;
}

public class StreamContentWriter
{
private readonly TaskCompletionSource<Stream> _streamSetSource = new();
private readonly TaskCompletionSource _streamEndSource = new();
public StreamContentWriter SetOutputStream(Stream outputStream)
{
_streamSetSource.SetResult(outputStream);
return this;
}

public async Task WriteAsync(string content)
{
var stream = await _streamSetSource.Task;
await PipeWriter.Create(stream).WriteStringAsync(content);
}

public void Complete()
=> _streamEndSource.SetResult();
public Task WaitAsync()
=> _streamEndSource.Task;
}

StreamContentWriter提供了四个方法,SetOutputStream方法用来设置请求输出流,上面重写的SerializeToStreamAsync调用了此方法。单条字符串消息的写入实现在WriteAsync方法中,它最终调用的依然是上面提供的WriteStringAsync扩展方法。整个流式请求的过程通过一个TaskCompletionSource对象提供的Task来表示,当客户端完成所有输出后,会调用Complete方法,该方法进一步调用这个TaskCompletionSource对象的SetResult方法。由于WaitAsync方法返回TaskCompletionSource对象提供的Task,SerializeToStreamAsync方法会调用此方法等待”客户端输出流“的终结。

如下的代码片段体现了SendStreamRequestAsync方法的实现。在这里我们创建了一个表示流式请求的HttpRequestMessage对象,我们将协议版本设置为HTTP2,作为主体内容的HttpContent正式根据StreamContentWriter对象创建的StreamContent对象。

static async Task SendStreamRequestAsync(
string url,
string[] lines,
Func<string, Task> handler
)
{
using var httpClient = new HttpClient();
var writer = new StreamContentWriter();
var request = new HttpRequestMessage(HttpMethod.Post, url)
{
Version = HttpVersion.Version20,
VersionPolicy = HttpVersionPolicy.RequestVersionExact,
Content = new StreamingWeb.StreamContent(writer)
};
var task = httpClient.SendAsync(
request, HttpCompletionOption.ResponseHeadersRead);
_ = Task.Run(async () =>
{
var response = await task;
var reader = PipeReader.Create(
await response.Content.ReadAsStreamAsync());
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
while (TryReadMessage(ref buffer, out var message))
{
await handler(message);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
});

foreach (string line in lines)
{
await writer.WriteAsync($"{line} ({DateTimeOffset.UtcNow})");
await Task.Delay(1000);
}
writer.Complete();
}

我们将这个HttpRequestMessage作为请求利用HttpClient发送出去,实际上发送的内容最终是通过调用StreamContentWriter对象的WriteAsync方法输出的,我们每隔1秒发送一条消息。HttpClient将请求发出去之后会得到一个通过HttpResponseMessage对象表示的响应,在一个异步执行的Task中,我们根据响应流创建一个PipeReader对象,并在一个循环中调用上面定义的TryReadMessage方法逐条读取接收到的单条消息进行处理。

相关推荐

Three.js导航网格:数据结构设计与实现

导航网格或navmesh是一种用于虚拟环境中路线规划的数据结构,在游戏开发中特别有用。它由一组代表地图区域的凸多边形组成,多边形的边用额外的连接信息进行了注释,显示了游戏角色可以穿越的区域。正如你...

苹果公司申请新专利,汽车AR导航这些功能都能实现

近日,美国专利商标局公布了苹果公司一项新的专利申请,这项新专利与汽车AR(增强现实)导航有关,可为下一代地图导航提供支持。用户通过AR设备可以直观地在3D模型中看到何时转向、减速或掉头等提示。据悉,苹...

运行时修改内存中的Dalvik指令来改变代码逻辑

一、前言最近在弄脱壳的时候发现有些加固平台的加固方式是修改了dex文件结构,然后在加载dex到内存的时候,在进行dex格式修复,从而达到了apk保护的效果,那么在dex加载到内存的时候,如何进行dex...

流放之路2(0.2.0g)版本5月1日更新

以下内容为谷歌加gpt翻译,有不妥之处请自行谅解:2025年5月1日中午12:00(GMT+8):由于技能宝石消失问题,本次补丁已回滚。我们会尽快重新发布此补丁。2025年5月1日下午12:30(...

Win10桌面/手机版最深层次开发功能挖掘

IT之家讯Win10开发者预览版为我们提供了一个Win10大框架的早期概览,使开发者与热心用户都可以提前感受Win10带来的新特性,尝试新工具,而作为开发者,最关心的莫过于Windows多平台通用应...

旅行规划太复杂?Deepchat结合高德地图MCP,轻松搞定杭州清明游

我们憧憬美好旅行的同时,旅游规划往往成为一道难以跨越的门槛。路线如何安排最合理?交通怎样选择最便捷?景点该如何取舍才能避开人潮?这些问题常常让人望而却步。如今,随着高德地图MCP服务的发布,结合Dee...

yaml基础语法讲解(yaml文件如何使用)

YAML基础它的基本语法规则如下:大小写敏感使用缩进表示层级关系缩进时不允许使用Tab键,只允许使用空格。缩进的空格数目不重要,只要相同层级的元素左侧对齐即可#表示注释,从这个字符一直到行尾,都会...

谷歌向更多开发者开放地图平台 助推增强现实游戏发展

来源:cnBeta谷歌周一宣布了将向所有人开放GoogleMapsPlatform的消息,以便开发者能够将地图数据纳入它们的游戏中。借助新工具,开发商可轻松打造类似《精灵宝可梦Go》的增强现...

咖啡一点通 | 拉花是门技巧,学会这些方法可以快速入门

了解更多精彩内容,欢迎关注【世界咖啡馆地图公众号ID:cafe_maps】咖啡表面的漂亮拉花,是运用牛奶、奶泡来制作,咖啡拉花有其难度与技巧,拉花时,要考量不同的咖啡杯大小而调整倒入的距离,倒入时...

scala基础教程之-数据类型(scala类的定义)

一、基本的数据类型数据类型描述Byte8位有符号值。范围从-128到127Short16位有符号值。范围从-32768至32767Int32位有符号值。范围从-2147483648to2...

Google Maps React 组件宣布开源(react开发chrome插件)

大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发!前言当MapsJavaScriptA...

为什么、何时以及如何创建顾客体验地图?

导读:体验地图结合了两种强大的工具——讲故事和可视化——帮助团队理解和解决顾客的需求。虽然根据场景和业务目标不同,顾客体验地图会采取多种形式,但是通常会包含某些通用元素,并遵循一些基本的指导原则。本文...

Apple Watch 开发者套件 WatchKit 发布后,你需要知道这些

Apple上线了共AppleWatch开发者使用的开发工具WatchKit。从这款工具和相关文档中,TheVerge发现了一些重要细节。AppleWatch几乎无法独立使用根据开发者文...

Android 开发系列教程之(一)Android基础知识

什么是AndroidAndroid一词最早是出现在法国作家维里耶德利尔·亚当1986年发表的《未来夏娃》这部科幻小说中,作者利尔·亚当将外表像人类的机器起名为Android,这就是Android小人名...

Overture Maps Foundation 发布全球交通数据集

据Gpsworld1月1日报道,OvertureMapsFoundation发布了新的全球交通数据集,这一数据集覆盖了全球8600万公里的道路。该数据集的发布标志着一个重要的里程碑,旨在为开发...