前言

在tcp使用可扩展的私有协议,通常使用TLV(length type value)设计方式。L解决TCP粘包问题。TV提供扩展能力。接下来本笔记使用TV使用protobuf实现。 protobuf已经成为TV的事实标准,作为开发人员的瑞士军刀就不需要再作过多描述。

一、创建工程

创建一个名为tcp-protobuf-and-server的rust工程目录

1
cargo new tcp-protobuf-and-server --bin

二、实现length加头和去头工具函数

1.实现

  • 位于src/utils.rs
  • write_head_and_bytes 先向TcpStream写入数据头,再写入数据payload
  • read_head_and_bytes 先读取4字节定长数据,再读取数据
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use std::io;
use std::io::{Read, Write};
use std::net::TcpStream;

pub fn write_head_and_bytes(mut stream: &TcpStream, data: &[u8]) -> io::Result<()> {
    // 存放头
    let buffer = (data.len() as u32).to_be_bytes();

    // 写入头
    stream.write_all(&buffer)?;

    // 写入body
    stream.write_all(data)?;

    Ok(())
}

pub fn read_head_and_bytes(mut stream: &TcpStream) -> io::Result<Vec<u8>> {
    let mut buffer = [0u8; 4];
    // 读取4个字节定长数据
    stream.read_exact(&mut buffer[..])?;

    // 计算buffer长度
    let size = u32::from_be_bytes(buffer);

    // 数据body
    let mut payload = vec![0; size as usize];

    // 读取数据body
    stream.read_exact(&mut payload[..])?;

    Ok(payload)
}

2.修改lib.rs

把utils.rs添加到模块里面,rust编译器有点懒,要编译的模块一个要告知,比如注册到lib.rs。不像go放到目录里面就给编译

1
echo 'pub mod utils;' >> src/lib.rs

三、使用protobuf

1.定义protobuf文件

位于protos/small-model.proto 位置,这里使用protobuf3的语法,可以看到syntax = "proto3"; 这行"

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
syntax = "proto3";

message HotWordRequest {
    string model_type = 1;
    string model_text=2;
    string session_id = 3;
}

message HotWordResponse {
    int64 errcode = 1;
    string errmsg = 2;
    string session_id = 3;
    bytes gen_model=4;
}

2.编写build.rs 编译脚本-生成protobuf代码

在很多编译语言,一般使用命令生成protobuf代码,rust默认里面使用build.rs,好处是修改protobuf文件,不会产生忘了重新生成代码的尴尬。 .out_dir指定代码生成位置。.inputs指定.proto文件位置。

1
2
3
4
5
6
7
8
9
fn main() {
    protobuf_codegen_pure::Codegen::new()
        .out_dir("src/protos")
        .inputs(&["protos/small-model.proto"])
        .include("protos")
        .run()
        .expect("Codegen failed.");
}

3.修改Cargo.toml

1
2
[build-dependencies]
protobuf-codegen-pure = "2.14"

4.告知编译器

4.1 修改lib.rs

1
echo 'pub mod protos;' >> src/lib.rs

4.2在protos目录添加mod.rs

  • 创建mod.rs
1
vim src/protos/mod.rs
  • 加入以下内容
1
2
pub mod small_model;
pub use small_model::{HotWordRequest, HotWordResponse};

四、实现客户端代码 client.rs

1. 使用protobuf文件

这里使用绝对路径引入模块,回到第一条, cargo new tcp-protobuf-and-server --bin创建工程, 在rust里面的包名,如果有中横岗-必须要换成_下滑线

1
use tcp_protobuf_and_server::protos::small_model::*;

创建请求结构体, xxx::new,这里的xxx是small-model.proto 里面定义的message名

1
2
3
4
    let mut req = HotWordRequest::new();
    req.model_text = c.data;
    req.model_type = t;

2. protobuf结构体序列化

1
let out_req = req.write_to_bytes().unwrap();

3. protobuf结构体反序列化

1
let rsp = protobuf::parse_from_bytes::<HotWordResponse>(&payload).unwrap();

4. 完整代码如下

src/client.rs 内容如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use protobuf::Message;
use std::net::TcpStream;
use structopt::StructOpt;
use tcp_protobuf_and_server::protos::small_model::*;
use tcp_protobuf_and_server::utils::*;

#[derive(StructOpt, Debug)]
struct Client {
    /// tcp server address
    #[structopt(short, long)]
    addr: String,

    /// data
    #[structopt(short, long)]
    data: String,

    /// hotword
    #[structopt(short, long)]
    hot_world: bool,

    /// grammar
    #[structopt(short, long)]
    grammar: bool,

    /// small model
    #[structopt(short, long)]
    small_model: bool,
}

fn main() {
    // 解析命令行
    let c = Client::from_args();

    let t: String;

    if c.hot_world {
        t = "hot_world".to_string();
    } else if c.grammar {
        t = "grammar".to_string();
    } else if c.small_model {
        t = "small_model".to_string();
    } else {
        panic!("hotword(false), grammar(false), small-model(false)");
    }

    let mut req = HotWordRequest::new();
    req.model_text = c.data;
    req.model_type = t;

    // 0.protobuf结构体转成bytes
    let out_req = req.write_to_bytes().unwrap();

    // 1.连接服务端
    let stream = TcpStream::connect(c.addr).unwrap();

    // 2.写入数据,并加4个字节的头
    write_head_and_bytes(&stream, &out_req).unwrap();

    // 3.读取TLV数据,提取V
    let payload = read_head_and_bytes(&stream).unwrap();

    // 4.解析payload 到protobuf结构体里面
    let rsp = protobuf::parse_from_bytes::<HotWordResponse>(&payload).unwrap();

    // 5.读取rsp结构体里面数据
    let gen_model = rsp.get_gen_model();

    // 6.写入到终端
    println!("payload:{}", std::str::from_utf8(&gen_model).unwrap());
}

五、实现服务端

src/server.rs内容 服务端和客户端大同小异,不作展开

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use protobuf::Message;
use std::io;
use std::net::{TcpListener, TcpStream};
use structopt::StructOpt;
use tcp_protobuf_and_server::protos::small_model::*;
use tcp_protobuf_and_server::utils::*;

#[derive(StructOpt, Debug)]
struct Server {
    /// server address
    #[structopt(short, long)]
    server_addr: String,
}

fn work_conn(stream: TcpStream) -> io::Result<()> {
    // 0.读取TLV数据,提取data
    let payload = read_head_and_bytes(&stream)?;
    // 1.解析protobuf数据到结构体
    let req = protobuf::parse_from_bytes::<HotWordRequest>(&payload)?;
    // 2.获取请求参数
    let model_text = req.get_model_text();

    // 3.追加一些数据,然后返回
    let rsp_model_text = format!(
        "#Server read <client data:{}> <client type:{}>",
        model_text,
        req.get_model_type()
    );

    // 4.声明响应
    let mut rsp = HotWordResponse::new();

    // 5.设置回写数据
    rsp.set_gen_model(rsp_model_text.as_bytes().to_vec());

    // 6.protobuf 结构体转成bytes
    let out_rsp = rsp.write_to_bytes().unwrap();

    // 7.写入客户端
    write_head_and_bytes(&stream, &out_rsp)?;
    Ok(())
}

impl Server {
    fn main_loop(&self) {
        // 创建listen
        let listener = TcpListener::bind(&self.server_addr).unwrap();

        for stream in listener.incoming() {
            let stream = match stream {
                Ok(stream) => stream,
                Err(e) => {
                    /* connection failed */
                    println!("{:#?}", e);
                    continue;
                }
            };

            work_conn(stream).unwrap();
        }
    }
}

fn main() {
    let s = Server::from_args();

    s.main_loop();
}

六、完整的Cargo.toml

如果要在rust工程编译多个可执行文件,可以使用[[bin]]标记 下面的配置,可以同时编译clientserver两个可执行文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
[package]
name = "tcp-protobuf-and-server"
version = "0.1.0"
authors = ["guonaihong <guonaihong@qq.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[[bin]]
name="client"
path="./src/client.rs"

[[bin]]
name="server"
path="./src/server.rs"

[build-dependencies]
protobuf-codegen-pure = "2.14"

[dependencies]
protobuf = "2.9"
structopt = "0.3"

七、运行

  • 运行服务端
1
./target/debug/server -s "0:8888"
  • 运行客户端
1
./target/debug/client -g -a "0:8888" -d "test word"