前言

最近需要在rust里面使用多进程,有进程锁是第一步,毕竟进入临界区不能裸奔。 如果遥远的记忆没有出错,fcntl基于fd也可以实现进程锁,但是不是强制性的。所以这里选择有名信号量。

一、有名信号量c语言函数原型

1.sem_open()函数

sem_open() creates a new POSIX semaphore or opens an existing semaphore. The semaphore is identified by name. For details of the construction of name, see sem_overview(7).

1
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);

decrements (locks) the semaphore pointed to by sem. If the semaphore’s value is greater than zero, then the decrement proceeds, and the function returns, immediately. If the semaphore currently has the value zero, then the call blocks until either it becomes possible to perform the decrement (i.e., the semaphore value rises above zero), or a signal handler interrupts the call.

2.sem_wait()函数

1
int sem_wait(sem_t *sem);

sem_post() increments (unlocks) the semaphore pointed to by sem. If the semaphore’s value consequently becomes greater than zero, then another process or thread blocked in a sem_wait(3) call will be woken up and proceed to lock the semaphore

3.sem_post()函数

1
int sem_post(sem_t *sem);

二、信号量原理

有名信号量的原理,类似如下伪代码,wait()函数发现资源分配完了,就会等待,post()函数的作用就是返还资源数,并且唤醒在wait()上等待的进程,或者线程。最后说下sem_open,最后一个参数可以指定资源数。资源数保证临界区同时有几个进程或者线程进入。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type sem struct {
	count int
}

func (s *sem) wait() {
	for s.count == 0 {
	}
	s.count--
}

func post() {
	// 1.累加
	s.count++
	// 2.唤醒等待的(线/进)程
}

三、rust里面调用有名信号量

1.Cargo.toml配置

1
2
[dependencies]
libc="0.2"

2.rust里面调用有名信号量,最小化example

了解到原理,再使用sem就简单了,可以初始化资源数为1,然后使用wait和post包裹业务逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use libc;
use std::ffi::CString;

fn main() {
    let path = CString::new("asr.lock.dat").expect("CString::new failed");

    unsafe {
        libc::sem_unlink(path.as_ptr());

        // 打开, 最后一个参数指定有一个进程可以进入临界区
        let sem = libc::sem_open(path.as_ptr(), libc::O_CREAT | libc::O_EXCL, 0644, 1);

        // 申请,类似于lock
        libc::sem_wait(sem);

        println!("hello");

        // 归还,类似于unlock
        libc::sem_post(sem);

        libc::sem_close(sem);
    }
}

四、测试进程锁

1.Cargo.toml配置

1
2
3
[dependencies]
nix = "0.16.1"
libc = "0.2"

2.测试代码

测试代码会fork两个进程,对同一个文件,读里面数字,累加之后。再写回去。如果数字正常,说明没有问题。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use libc;
use nix::unistd::{fork, ForkResult};
use std::ffi::CString;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::SeekFrom;

unsafe fn add_num(file_name: &str, lock: *mut libc::sem_t) {
    // 打开文件
    let mut file = OpenOptions::new()
        .read(true)
        .write(true)
        .create(true)
        .open(file_name)
        .unwrap();

    for _ in 0..2000 {
        // 如果为lock第值为0就等待, 不为0减去1
        libc::sem_wait(lock);

        // 数据buffer
        let mut buffer = String::new();

        // 设置文件指针到0号位置
        file.seek(SeekFrom::Start(0)).unwrap();

        // 从文件读取到buffer里面
        file.read_to_string(&mut buffer).unwrap();

        // buffer转成数字
        let mut size = buffer.parse::<i32>().unwrap();

        // 累加
        size += 1;

        // 数字转成string
        let to_data = format!("{:04}", size);

        // 偏移到文件指针0号位置
        file.seek(SeekFrom::Start(0)).unwrap();

        // 写到文件里面
        file.write(&to_data[..].as_bytes()).unwrap();

        // lock的值+1
        libc::sem_post(lock);
    }
}

fn main() {
    let file_name = "./test.dat";

    let mut f = File::create(file_name).unwrap();
    f.write(b"0000").unwrap();

    // 创建有名信号量字符串
    let path = CString::new("asr.lock.dat").expect("CString::new failed");

    unsafe {
        libc::sem_unlink(path.as_ptr());
        let lock = libc::sem_open(path.as_ptr(), libc::O_CREAT | libc::O_EXCL, 0644, 1);

        // fork父子进程
        match fork() {
            Ok(ForkResult::Parent { child, .. }) => {
                println!(
                    "Continuing execution in parent process, new child has pid: {}",
                    child
                );
                add_num(file_name, lock);
            }

            Ok(ForkResult::Child) => add_num(file_name, lock),
            Err(_) => println!("Fork failed"),
        }
    }
}

五、TODO 生产者消费者模式

TODO可以用sem完成生产者消费者模式练练手。