RPC(Remote Procedure Call)远程过程调用。简单来说,就是我在本地调用了一个函数,或者对象的方法,实际上是调用了远程机器上的函数,或者远程对象的方法,但是这个通信过程对于程序员来说是透明的。

gRPC 是一个高性能、开源的远程过程调用(RPC)框架,由 Google 开发并开源。它基于 HTTP/2 协议进行通信,使用 Protocol Buffers 作为接口定义语言(IDL),支持多种编程语言,并提供了诸如负载均衡、认证、流控等功能。

gRPC 的主要特点包括:

  1. 高效性能:采用 HTTP/2 和 Protocol Buffers 等技术,实现了低延迟和高吞吐量。
  2. 多语言支持:支持多种编程语言,如 C/C++, Java, Python, Go 等。
  3. 自动代码生成:通过 Protocol Buffers 定义服务接口和消息格式,并自动生成客户端和服务器端的代码。
  4. 支持双向流式传输:可以在单个连接上同时发送请求和响应数据。

image-20240901111003799

Protocol Buffers 由Google定义的一个与语言和平台无关具有可拓展的用于序列化结构化的数据(例如:XML、JSON)的协议。但更小、更快、更简单。您只需定义数据的结构化方式,然后就可以使用特殊生成的源代码轻松地向各种数据流写入和读取结构化数据,并可以被各种语言使用。

gPRC 明确了 ChannelRPCMessage 三个概念:

  1. Channel:Channel 表示客户端与服务器之间的通信通道。它负责管理连接和数据传输,并提供了一种高效的方式来进行远程过程调用(RPC)。
  2. RPC:Remote Procedure Call(远程过程调用)是指客户端应用程序通过网络请求执行在另一个计算机上运行的函数或方法。gRPC 使用 RPC 作为主要通信模式,允许客户端应用程序像调用本地函数一样调用远程服务。
  3. Message:Message 是 gRPC 中定义消息格式和数据结构的基本单元。使用 Protocol Buffers 定义消息格式,并通过这些消息在客户端和服务器之间进行数据交换。

image-20240901131130272

每个Channel可能有许多RPC(一个RPC相当于一个HTTP/2连接),而每个RPC可能有许多Message(一个Message即为一个会话)。

定义 Protocol

大概了解了 gRPC 的一些概念之后就开始尝试定义 Protocol,Protocol 是 gRPC 跨语言的基础。

首先一个 Protocol 文件要有syntaxpackage两个属性,用于声明proto版本和包名(包名用于防治消息命名冲突)

syntax = "proto3"; // 语法proto3
package hello; // 包名

然后就可以开始定义 RPC 交互的结构,一个应用可以对外暴露多个服务,每一个服务都是一个service 代码块,service 内部通过 rpc 代码段声明暴露的方法以及方法的入参和出参类型,然后通过message 代码块声明请求和响应结构(message 在proto语法中表示一个对象,即JS中的object)。

service Hello {
    rpc SayHello (SayHelloRequest) returns (SayHelloResponse) {}
}

message SayHelloRequest {
    string name = 1;
}

message SayHelloResponse {
    string message = 1;
}

在定义消息体时,字段属性可以有以下类型,分别对应JS中的类型如下

.proto Types JS Types
double Number
float Number
int32 Number
int64 Number
uint32 Number
uint64 Number
sint32 Number
sint64 Number
fixed32 Number
fixed64 Number
sfiexed32 Number
sfiexed64 Number
bool Boolean
string String
bytes Uint8Array

除了基本类型以外还可以使用复杂类型,像枚举、对象、数组等

message SayHelloResponse {
    string message = 1;
    Status status = 2;
    User user = 3; // 对象类型
    repeated Status status = 4; // 数组
}

enum Status { // 定义枚举
  NORMAL = 0;
  FREEZE = 1;
  DELETE = 2;
}

message User {
  string name = 1;
  int32 age = 2;
}

使用 gRPC 通信

使用protocol 有两种方式,一种是基于 ProtoBuf 在运行时动态处理,另一种是基于 protoc 等编译器预先生成代码。我们这里先通过运行时加载的形式来进行测试。

首先安装依赖包

$ pnpm install @grpc/proto-loader @grpc/grpc-js

@grpc/proto-loader 用于运行时解析Protocol文件,@grpc/grpc-js 用于提供 gRPC server和client能力。

server 和 client 的运行时解析 Protocol 的过程是一致的

import grpc from '@grpc/grpc-js'
import protoLoader from '@grpc/proto-loader'
import { dirname } from 'path'
import { fileURLToPath } from 'url'

const __dirname = dirname(fileURLToPath(import.meta.url))
const packageDescriptor = protoLoader.loadSync(
  __dirname + '/hello.proto',
  {
    keepCase: true
  }
)

const helloPackage = grpc.loadPackageDefinition(packageDescriptor).hello

定义 server

在 server 中实现 Protocol 中定义的方法,我们根据上一小节的 proto 文件来进行实现

// ... load proto

function SayHello(call, callback) {
  callback(null, { message: 'Hello ' + call.request.name })
}

const server = new grpc.Server()
server.addService(helloPackage.Hello.service, {
  SayHello,
})

server.bindAsync('0.0.0.0:33321', grpc.ServerCredentials.createInsecure(), () => {
  console.log('server running on prot 33321')
  server.start()
})

定义 client

加载完 proto 之后,在 client 中创建 RPC 示例(需要指定需要调用服务的 host),然后执行对应的方法即可。

// load proto

const callHello = new helloPackage.Hello('localhost:33321', grpc.credentials.createInsecure())

callHello.SayHello({ name: 'Leo' }, (err, response) => {
  if (err) { return console.error(err) }
  console.log('Response: ', response.message)
})

最终的调用结果如下,可以看到已经获取正确的响应消息。

image-20240901145453362

在 MidwayJS 中使用 gRPC

首先安装依赖

$ npm i @midwayjs/grpc@3 --save
$ npm i @midwayjs/grpc-helper --save-dev

然后开启组件

import { Configuration, App } from '@midwayjs/core';
import { join } from 'path';
import * as koa from '@midwayjs/koa';
import * as grpc from '@midwayjs/grpc';

@Configuration({
  imports: [
    // ...other components
    grpc,
  ],
  importConfigs: [join(__dirname, './config')],
})
export class MainConfiguration {
  @App('koa')
  app: koa.Application;

  async onReady() {
  }
}

在配置文件中声明需要暴露的 service

export default (appInfo: MidwayAppInfo): MidwayConfig => ({
  // ... other config
  grpcServer: {
    services: [
      {
        protoPath: join(appInfo.appDir, 'proto/hello.proto'),
        package: 'hello',
      },
    ],
  },
});

然后添加 proto/hello.proto声明文件

syntax = "proto3"; // 语法proto3

package hello; // 包名

service Hello {
    rpc SayHello (SayHelloRequest) returns (SayHelloResponse) {}
}


message SayHelloRequest {
    string name = 1;
}

message SayHelloResponse {
    string message = 1;
}

在package.json中添加编译命令,通过proto转译出ts类型

"generate": "tsproto --path proto --output src/domain"

创建src/provider/hello.ts,用于实现对外暴露的方法

import { MSProviderType, Provider, GrpcMethod } from '@midwayjs/core';
import { hello } from '../domain/hello';

/**
 * 实现 hello.Hello 接口的服务
 */
@Provider(MSProviderType.GRPC, { package: 'hello' })
export class Hello implements hello.Hello {
  @GrpcMethod()
  async sayHello(request: hello.SayHelloRequest) {
    return { message: 'Hello ' + request.name };
  }
}

此时启动dev server,通过刚才的client访问可以实现相同的效果。

MidwayJS 也支持客户端调用,添加一段grpc的客户端配置

//...
grpc: {
  services: [
    {
      url: 'localhost:6565',
      protoPath: join(appInfo.appDir, 'proto/hello.proto'),
      package: 'hello',
    },
  ],
},

然后就可以获取客户端进行调用

import { Inject, Provide } from '@midwayjs/core';
import { Clients } from '@midwayjs/grpc';
import { hello } from '../domain/hello';

@Provide()
export class UserService {
  @Inject()
  clients: Clients;

  async hello() {
    const helloService =
      this.clients.getService<hello.HelloClient>('hello.Hello');
    const result = await helloService
      .sayHello()
      .sendMessage({ name: 'midway' });
    return result.message;
  }
}

流式调用

在 Midway 中,流式的 gRPC 调用变得非常简单,首先来看下流式接口的proto定义

syntax = "proto3";
package stream;

service HelloStream {
  rpc BidirectionalFlow (stream StreamRequest) returns (stream StreamResponse) {}
  rpc ClientStream (stream StreamRequest) returns (StreamResponse) {}
  rpc ServerStream (StreamRequest) returns (stream StreamResponse) {}
}

message StreamRequest {
  int32 id = 1;
  int32 num = 2;
}

message StreamResponse {
  int32 id = 1;
  int32 num = 2;
}

三个方法分别是双工流,客户端推送流,服务器推送流。

服务端推送流

服务器推送流的场景中,客户端发送一次消息,服务器可以进行多次写入,直到结束写入,服务端的实现如下

@Provider(MSProviderType.GRPC, { package: 'stream' })
export class HelloStream implements stream.HelloStream {
  @Inject()
  ctx: Context;

  @GrpcMethod({ type: GrpcStreamTypeEnum.WRITEABLE }) // 写入流
  async serverStream(request: stream.StreamRequest) {
    this.ctx.write({
      num: 1 + request.num,
    });
    this.ctx.write({
      num: 2 + request.num,
    });
    this.ctx.write({
      num: 3 + request.num,
    });

    this.ctx.end(); // 结束推送
  }
}

客户端实现如下

import { Clients } from '@midwayjs/grpc';

@Provide()
export class UserService {
  @Inject()
  clients: Clients;

  async serverStream() {
    const streamService =
      this.clients.getService<stream.HelloStreamClient>('stream.HelloStream');

    const result = await streamService
      .serverStream()
      .sendMessage({ id: 1, num: 0 });
    let total = 0;
    result.forEach(data => {
      total += data.num;
    });
    return total; // 6
  }
}

客户端推流

客户端推流场景中,客户端可以发送多次消息,服务端只在客户端完成推流后响应一次,服务端实现突下,通过onEnd函数绑定客户端写入完成时要执行的回调函数

@Provider(MSProviderType.GRPC, { package: 'stream' })
export class HelloStream implements stream.HelloStream {
  @Inject()
  ctx: Context;

  sumDataList: number[] = [];

  @GrpcMethod({ type: GrpcStreamTypeEnum.READABLE, onEnd: 'end' })
  async clientStream(data: stream.StreamResponse) {
    this.sumDataList.push(data.num);
  }

  end() {
    return { num: this.sumDataList.reduce((a, b) => a + b) };
  }
}

客户端实现如下

import { Clients } from '@midwayjs/grpc';

@Provide()
export class UserService {
  @Inject()
  clients: Clients;

  async clientStream() {
    const streamService =
      this.clients.getService<stream.HelloStreamClient>('stream.HelloStream');

    const result = await streamService
      .clientStream()
      .sendMessage({ num: 2 })
      .sendMessage({ num: 5 })
      .sendMessage({ num: 6 })
      .sendMessage({ num: 7 })
      .end(); // 结束推送
    return result.num; // 20
  }
}

双工流

双工流中客户端和服务端都是通过流进行传输的,方式即为上面两种方式的组合,服务端实现如下

@Provider(MSProviderType.GRPC, { package: 'stream' })
export class HelloStream implements stream.HelloStream {
  @Inject()
  ctx: Context;

  @GrpcMethod({ type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' })
  async bidirectionalFlow(data: stream.StreamRequest) {
    this.ctx.write({
      id: data.id,
      num: data.num + 12,
    });
  }
  
  duplexEnd() {
    console.log('duplexEnd');
  }
}

值得注意的是客户端中无法保证调用与响应的顺序,如果在proto中定义id,在服务端处理数据时将传入的id返回,可以保证某个响应执行对应的回调

bidirectionalFlow() {
  const streamService =
    this.clients.getService<stream.HelloStreamClient>('stream.HelloStream');

  const t = streamService.bidirectionalFlow();

  t.sendMessage({ num: 1 }).then(res => {
    console.log(res.num);
  });
  t.sendMessage({ num: 4 }).then(res => {
    console.log(res.num);
  });
  t.end();
}

前端小白