Arrow_rpc

Posted by MegaBillow on Thursday, August 26, 2021

主要是翻译

  1. https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/
  2. https://arrow.apache.org/docs/format/Flight.html

Arrow Flight 作为 Arrow 的高性能 RPC 框架。 如何达到高性能的?

Flight 针对 Arrow record batch 流的上下行传输。一组 metadata 方法用于发现和解释这些流,并且支持应用特定的方法。

方法和消息的格式使用 protobuf 定义。在实现上进行了进一步的优化,降低使用 protobuf 中的开销(例如避免过多的 memory copy)。

目标

针对大数据集的网络传输。 现有的基于文件的方式要求在反序列化之前传输到 local host。

而Arrow 的 columnar format 有以下特点:

  • on-the-wire 的表格数据表示方式不需要在接收方进行反序列化
  • streaming batches 的模式,一次传输一组 row
  • 支持 11 种语言

ODBC/JDBC 在提供给公开接口使用时,需要实现自身的 on-wire 二进制格式序列化。 而 Flight 则保持 on-wire 的数据与公开接口使用的数据采用相同的表示。

特点

  • 并行传输, 允许数据以 stream 的方式在一组 servers 和 client 之间传输。

基础请求

  • Handshake
  • ListFlight,返回可用的 data streams 列表
  • GetSchema,返回 data stream 的 schema
  • GetFlightInfo,返回数据集的访问 plan ,可能要求消费多个 data streams
  • DoGet,发送数据到 client
  • DoPut,接收client 的数据
  • DoAction,实现指定的 action,并返回结果
  • ListActions,返回可用的 action types

ProtoBuf 的定义参考 https://arrow.apache.org/docs/format/Flight.html#protocol-buffer-definition

利用 HTTP/2 的 streaming 实现双向 stream 数据收发。

Data stream 由 descriptor 进行标识,即下文中的 FlightDescriptor。 descriptor 可以是 path 或 任意的二进制命令,对应 protobuf 中的 DescriptorType, 包含 PATH 和 CMD 两个 enum。其中 PATH 是类似于文件系统路径的概念,而 CMD 则在注释中为用于生成一个 dataset 的 opaque cmd。

client 下载数据的主要流程是:

  1. 构建或获取所需 data set 对应的 FlightDescriptor 。 如果已知直接构建,如果未知则通过 ListFlight 方法进行获取。(ListFLight 发往哪里?)
  2. 调用 GetFlightInfo(FlightDescriptor) 获得 FlightInfo。 FlightInfo,包含 data 的 location、schema、 dataset size 等。数据的 metadata 与实际数据可能分布在不同的 server,所以会返回包含实际数据的 servers 信息。FlightInfo 还包含 Ticket 唯一对应请求所需的数据, 起到 data token 的作用。 可能涉及 data 的 route 过程。
  3. 连接到数据所在的 servers。
  4. 调用 DoGet(Ticket) 取得 record batches 的 data stream。

client 上传数据的流程与下载类似,差别在于最后调用 DoPut 上传 record batches stream。

Stream Management

Data stream 是 Flight 管理和处理的重点。

  • Parrallel consumption and locality awareness
    • Flight 是由 streams 组成
    • 每个 stream 有一个 FlightEndpoint:一个 opaque stream ticket 包含 consumption location
    • 利用 location 信息增强 data locality
  • Flight descriptor 的两个类型
    • path,例如: marketing.yesterday.sales
    • 任意二进制命令,例如:select a,b from foo where c > 10
  • Stream 列举
    • ListFlight(Criteria)
    • GetFlightInfo(FlightDescriptor)

Error handling

Flight 定义了一组 error codes,不同语言在实现上有所不同。详细内容参考: https://arrow.apache.org/docs/format/Flight.html

基于gRPC 的数据吞吐优化

与数据相关的类型是 FlightData

  • Generate the Protobuf wire format for FlightData including the Arrow record batch being sent without going through any intermediate memory copying or serialization steps.
  • Reconstruct a Arrow record batch from the Protobuf representation of FlightData without any memory copying or deserialization.

基于此设计,在实际使用 Flight 的很多真实应用中的瓶颈在与网络带宽。

水平扩展:并行和分区的数据获取

很多分布式数据查询系统都有 coordinator (Impala、Presto、Druid 等)角色接收用户请求进行路由,并将结果返回用户。这样的设计导致数据的多次传输,在大数据集场景存在扩展性问题。

A client request to a dataset using the GetFlightInfo RPC returns a list of endpoints, each of which contains a server location and a ticket to send that server in a DoGet request to obtain a part of the full dataset. 需要所有的service 都知道全局的service 信息。如何维护与同步这些全局信息?

整体呈现 multiple-endpoint 模式,这样带来的收益包括:

  • clients 可以并行访问这些 endpoints
  • 提供 GetFlightInfo 请求的 service 可以将工作委派给兄弟 services,更好地利用数据 locality 或实现负载均衡。
  • server 可以承担不同的角色,如一组节点负责 planner (GetFlightInfo),另一组作为 data provider (DoGet, DoPut)。(planner 不是同样与 coordinator 类似?)

架构示例图

应用业务逻辑的扩展

GetFlightInfo 请求支持在请求 dataset 时,发送透明的序列化命令执行其他类型的操作(类似单消息多指令?)。举例是,client 可以请求某个 dataset 常驻内存( pinned in memory )从而加速后续请求的响应速度。

Flight service 可以定义 actions ,用于 DoAction 请求。一个 action 请求包含 action 的名称以及所需要的信息数据。返回结果是一个opaque 的二进制 gRPC stream 。

一些示例:

  • Metadata 的发现,提供内置的 ListFights 请求之外的能力
  • 设置 session 相关参数与配置

加密与认证

Flight 支持现有的 gRPC 内置的 TLS/OpenSSL 加密能力。

对于认证,支持基础 user/password 的 BasicAuth 方法。

中间件与 Tracing

基于 gRPC 的 interceptors 概念,允许开发自定义的中间件,作用于进出的请求进行观测,一个例子就是 OpenTracing 。(这里的中间件从效果来看就类似于 java Spring 中的 AOP 模式)。

不仅仅是 gRPC

定义 server location 依照 RFC 3986 标准的 URI 格式,例如: grpc+tls://$HOST:$PORT 表示 TLS加密的 gRPC。

gRPC 作为 Flight server 的命令层是适合的,然而在数据传输层除了 TCP 之外还可能采用如 RDMA 等其他协议。

进一步需要了解问题

  • Flight service 的地址如何确定?特别是一组 servers 的情况下。
  • service 之间是否对等无状态?