进程间通讯——消息队列、信号量以及共享内存

xiaoxiao2021-02-28  11

本篇博客见《unix环境高级编程》的进程通信那一节,看到那一节就把这个写到博客里面啦,加深记忆。方便以后复习用……

消息队列,信号量,共享内存这三种我们称为XSI IPC,它们之间有很多相似之处。

在了解这三个之前,我们先来了解标识符和键。

 每个内核中的IPC结构(这里指消息队列,信号量和共享存储段)都用一个非负整数的标识符加以引用。例如,为了对一个消息队列发送或读消息时,只需要知道

其队列的标识符。

标识符是IPC对象的内部名。为使多个合作进程能够在同一IPC对象上会和,需要提供一个外部名,因此使用了键(key),每个IPC对象都与一个键相关联,于是键

就用作为该对象的外部名。

无论何时创建IPC结构(调用 msgget、semget 或shmget),都应指定一个键(key),键的数据类型由系统规定为 key_t,通常在头文件<sys/types.h>中被规定为长整型。

键由内核变换成标识符。

三个get函数(msgget、semget和shmget)都有两个类似的参数:一个key和一个整型flag。如若满足下列两个条件之一,则创建一个新的IPC结构(服务器进程创建)

(1)key是IPC_PRIVATE

(2)key当前未与特定类型的IPC结构相结合,并且flag中指定了IPC_CREAT位。

为访问现存的队列,key必须等于创建该队列时所指定的键,并且不应该指定IPC_CREAT。

注意,访问一个现存的队列,决不能指定IPC_PRIVATE作为键。因为这是一个特殊的键值,它总是用于创建一个新队列。为了访问一个用IPC_PRIVATE键创建的现存队列,

一定要知道与该队列相结合的标识符,然后在其他IPC调用中(如msgsnd和msgrcv)使用该标识符。

权限:: XSI  IPC权限

接下来我们来看消息队列:

消息队列是消息的链接表,存放在内核中并由消息队列标识符的标识。

每个队列都有一个msqid_ds结构与其相关联: struct msqid_ds{ struct ipc_perm        msg_perm; //ipc结构 msgqnum_t              msg_qnum; //队列中消息的数量 msglen_t                  msg_qbytes; //队列中最大的字节数 pid_t                         msg_lspid; //最后调用msgsnd的pid pid_t                         msg_lrpid; //最后调用msgrcv的pid time_t                       msg_stime; //最后调用msgsnd函数的时间 time_t                       msgrtime; //最后调用msgrcv函数的时间 time_t                       msg_ctime; //最后change的时间 ...... };

消息队列的函数:

#include   <sys/msg.h>

(1)msgget用于创建新队列或打开一个现存的队列。

        int msgget(key_t   key,  int   flag);

        返回值:成功返回消息队列的ID,出错返回-1;此后该值就可被用于其他三个消息队列的函数。

        参数:key:键,可以看上面的介绍,这个我们一般给一个非负整数,

                   flag:权限位

(2)msgctl函数对队列执行多种操作。

        int    msgctl(int  msqid,int   cmd,struct  msqid_ds *buf);

       返回值:成功返回0,出错-1

       参数:msqid:msgget的返回值

                 cmd

(3)调用msgsnd将数据放到消息队列中。

 int    msgsnd(int    msqid,const  void *ptr,size_t  nbyte,int flag);成功返回0,出错-1

每个消息都有三部分组成,正长整型类型字段,非负长度(nbyte),以及实际数据字节(对应于长度)。消息总是放在队列尾端。

ptr参数指向一个长整型数,它包含了正的整型消息类型,在其后紧跟消息数据,若发送的最长消息是128字节,则可定义下列结构:

struct   mymesg

{

     long     mtype; //正的整型消息类型

     char     mtext[128];     //消息数据

};

ptr就是指向mymesg结构的指针。接受者可以使用消息类型以非先进先出的次序去消息。

参数flag的值可以指定为IPC_NOWAIT.这类似于文件I/O的非阻塞I/O标志。一般给0的话,即忽略。

当msgsnd成功返回,与消息队列相关的msqid_ds结构得到更新,以标明发出该调用的进程ID(msg_lspid),

进行该调用的时间(msg_stime),并指示队列中加了一条消息(msg_qnum)

 (4)msgrcv从队列中取消息;

    ssize_t    msgrcv(int   msqid,void *ptr,size_t  nbytes, long  type  , int  flag);

成功返回消息的数据部分的长度,出错返回-1;

ptr和msgsnd的prt一样;

nbyte:数据缓冲区的长度

type:我们可以指定想要哪一种消息:type == 0 返回队列中的第一个消息。

                                                             type > 0    返回队列中消息类型为type的第一个消息

                                                             type < 0     返回队列中消息类型值小于或等于type绝对值的消息,如果这种消息有若干个,则取类型值最小的消息。

当msgrcv成功返回,内核更新与消息队列相关的msqid_ds结构,以指示调用者进程ID(msg_lspid),

和调用的时间(msg_stime),并将队列中的消息数减一(msg_qnum)

下面简单代码实现消息队列:

创建消息队列,写入消息:

#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <sys/msg.h> struct mymesg //ptr指针指向的结构 { long mtype; char mtext[512]; }; void main() { int msgid = msgget((key_t)1234, 0664|IPC_CREAT); //打开消息队列,没有创建的话则创建 assert(msgid != -1); struct mymesg buf; buf.mtype = 1000; //消息类型1000 strcpy(buf.mtext, "hello"); //消息数据hello msgsnd(msgid, &buf, strlen(buf.mtext), 0); //将数据放到消息队列中 buf.mtype = 2000; //消息类型2000 strcpy(buf.mtext, "word");//消息数据word msgsnd(msgid, &buf, strlen(buf.mtext), 0);//将数据放到消息队列中 }

从消息队列中取消息:

#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <sys/msg.h> struct node { long type; char buff[128]; }; void main() { int msgid = msgget((key_t)1234, 0664|IPC_CREAT); assert(msgid != -1); struct node buf; memset(&buf, 0, sizeof(buf)); msgrcv(msgid, &buf, sizeof(buf) - 1, 2000, 0); //从消息队列中取类型为2000的消息 printf("buf.type: %d\n", buf.type); printf("buf.buff: %s\n", buf.buff); }

执行结果:

接下来我们看看信号量::

信号量与其他IPC不同的是,它是一个计数器,用于多进程对共享数据对象的访问。

为了获得共享资源,进程需要执行的操作:

(1)测试控制该资源的信号量。

(2)信号量的值为正时,则进程可以使用该资源。使用了,就将信号量减一(p操作)。

(3)信号量的值为0,则进程进入休眠状态,直至信号量的大于0。进程被唤醒后,返回至第(1)步

 当进程不再使用该资源时,信号量的值加一(v操作)。如果有进程正在休眠等待此信号量,则唤醒进程。

信号量的测试及加一减一操作都是原子操作,信号量通常是在内核中实现。

常用的信号量形式被称为二元信号量或双态信号量,控制单个资源,初始值为1,。但是一般而言,信号量的初值可以是任意正数值,这说明

有多少个共享资源单位可供共享应用。

内核为每个信号量集合设置一个semid_ds结构: struct semid_ds{ struct ipc_perm sem_perm; unsigned short sem_nsems; //信号量在信号量集中的编号 time_t sem_otime; 最后调用semop()的时间。 time_t sem_ctime; 最后进行change的时间。 .... } 每个信号量由一个无名结构表示,它至少包含下列成员。 struct{ unsigned short semval; //信号量值,>=0 pid_t sempid; //最后使用信号量的pid unsigned short semcnt; //等待semval变为大于其当前值的线程或进程数 unsigned short semzcnt; //等待semval变成0的线程或进程数 }

XSI的信号量要复杂一些:

(1)信号量并非是单个非负值,而必须将信号量定义为含有一个或多个信号量值的集合。当创建一个信号量时,要指定集合中信号量值的数量。

(2)创建信号量与对其赋初值是分开的。

(3)即使没有进程正在使用各种形式的XSI IPC,它们仍然是存在的。有些程序在终止时并没有释放已经分配给它的信号量。

关于信号量的函数::

(1)要获得一个信号量的ID,要调用的函数semget

#include <sys/sem.h>

int   semget(key_t   key,  int   nsems,  int   flag);

返回值:成功返回信号量的ID,若出错则返回-1

nsems是该集合中的信号量数。如果引用一个现存的集合,则将nsems指定为0。如果是创建新集合,则必须指定nsems

(2)semctl函数包含了多种信号量的操作

int   semctl(int   semid,  int    semnum ,int   cmd , /* union   semun arg */);

第四个参数是可选的,如果使用该参数,则其类型是semun,它是多个特定命令参数的联合(union);

union semun {      int val;       //for SETVAL      struct semid_ds *buf;    //for IPC_STAT and IPC_SET      unsigned short *array;   //for GETALL and SETALL };

cmd参数指定下列10种命令的一种, 在semid指定的信号量集合上执行此命令,其中有5条命令是针对一个特定的信号量值

的,他们用semnum指定该信号量集合中的一个成员。semnum值在0和nsems-1之间。(包含0和nsems-1) IPC_STAT:对此集合取semid_ds结构,并存放在由arg.buff指向的结构中。 IPC_SET:按由arg.buf指向结构中的值设置与此集合相关结构中的下面三个字段值:sem_perm.uid、sem_perm.gid和                  sem_perm.mode。此命令只能由下列两种进程执行:一种是其有效用户ID等于sem_perm.cuid或sem_perm.uid的进程,                  另一种是具有超级用户特权的进程。 IPC_RMID:从系统中删除该信号量集合。这种删除是立即发生的。仍在使用此信号集合的其他进程在他们下次试图对此信号量                    集合进行操作时,将出错返回EIDRM。此命令只能由下列两种进程执行:一种是其有效用户ID等于sem_perm.cuid或                    sem_perm.uid的进程;另一种是具有超级用户特权的进程。 GETVAL:返回成员semnum的semval值。 SETVAL:设置成员semnum的semval值。该值由arg.val指定。 GETPID:返回成员semnum的sempid值。 GETNCNT:返回成员semnum的semncnt值。 GETZCNT:返回成员semnum的semzcnt值。 GETALL:取该集合中所有信号量的值,并将他们存放在arg.array指向的数组中。 SETALL:按arg.array指向的数组中的值,设置该集合中所有信号量的值。 除GETALL以外的所有GET命令semctl函数都返回相应的值。其他命令返回值为0.

(3)函数semop自动执行信号量集合上的操作数组,这是个原子操作。

int    semop(int   semid,struct sembuf semoparray[ ] ,size_t  nops);

返回值:成功返回0,出错返回-1

参数semoparray是一个指针,它指向一个信号量操作数组,信号量操作由sembuf结构表示:

struct sembuf {      unsigned short sem_num;    //信号集中的成员号(0,1....,nsems-1)      short sem_op;    //加减操作      short sem_flg;    //IPC_NOWAIT, SEM_UNDO }; 参数nops规定该数组中的操作的数量(元素数)。 简单代码实现::我把信号量封装了一下。 信号量的声明:sem.h文件 #include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> #include <unistd.h> #include <sys/sem.h> union semun { int val; }; void seminit(); //初始化 void semp(); //减一操作 void semv(); //加一操作 void semdel(); //删除操作 sem.c文件 #include "sem.h" int semid = 0; void seminit() //初始化操作 { semid = semget((key_t)1234, 1, 0666); if(semid == -1) //打开现有的失败,创建信号量 { semid = semget((key_t)1234, 1, 0666 | IPC_CREAT); if(semid == -1) { perror(""); exit(0); } union semun v; v.val = 0; //设定初始值为0 semctl(semid, 0, SETVAL, v); } } void semp() //减一操作 { struct sembuf buf; buf.sem_num = 0; buf.sem_op = -1; buf.sem_flg = SEM_UNDO; semop(semid, &buf, 1); } void semv() //加一操作 { struct sembuf buf; buf.sem_num = 0; buf.sem_op = 1; buf.sem_flg = SEM_UNDO; semop(semid, &buf, 1); } void semdel() //删除信号量 { semctl(semid, 0, IPC_RMID, NULL); }

一个进程给一个文件里面写东西,当写的时候,另一个进程不能读,利用信号量 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <fcntl.h> #include "sem.h" void main() { int fd = open("a.txt", O_CREAT | O_WRONLY | O_TRUNC, 0664); assert(fd != -1); seminit(); //初始化信号量 while(1) { char buff[128] = {0}; printf("please input data: "); fflush(stdout); fgets(buff, 128, stdin); buff[strlen(buff) - 1] = 0; write(fd, buff, strlen(buff)); semv(); //写完给信号量值加一 if(strncmp(buff, "end", 3) == 0) { break; } } close(fd); } 写完了之后,信号量值加一,这时候读的那个进程就可执行了,读之前要对信号量值减一 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <fcntl.h> #include "sem.h" void main() { int fd = open("a.txt", O_CREAT | O_RDONLY , 0664); assert(fd != -1); seminit(); //初始化信号量 while(1) { char buff[128] = {0}; int n = 1, flag = 0; semp(); //减一操作 while(1) { n = read(fd, buff, 10); if(n == 0) { break; } if(strncmp(buff, "end", 3) == 0) { flag = 1; break; } printf("n = %d, buff = %s\n", n, buff); memset(buff, 0, 128); } if(flag) { break; } } close(fd); }

 共享存储::

共享存储允许两个或更多进程共享一给定的存储区。因为数据不需要在客户进程和服务器进程之间复制,所以共享存储是进程间通讯最快的一种。

使用共享存储必须是多进程之间同步访问共享存储区

即就是当我有一A进程正在将数据放入共享存储区,这个时候另一B进程是不能从共享存储区中去取数据,直到进程A写这一操作执行完

这里我用了信号量来实现进程的同步,信号量的操作利用上面信号量的代码::

简单代码实现如下:

#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <sys/shm.h> #include "sem.h" void main() { int shmid = shmget((key_t)1234, 128, 0664 | IPC_CREAT); //获得一个共享存储标识符 assert(shmid != -1); char *ptr = (char *)shmat(shmid, NULL, 0); //将共享存储连接到内核分配的一块空间上 assert(ptr != (char *)-1); seminit(); //初始化信号量 while(1) { printf("please input: "); fflush(stdout); fgets(ptr, 128, stdin); ptr[strlen(ptr) - 1] = 0; semv(); //信号量值加一 if(strncmp(ptr, "end", 3) == 0) { break; } } shmdt(ptr); //对共享存储的操作已结束 } #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <sys/shm.h> #include "sem.h" void main() { int shmid = shmget((key_t)1234, 128, 0664 | IPC_CREAT); assert(shmid != -1); char *ptr = (char *)shmat(shmid, NULL, 0); assert(ptr != (char *)-1); seminit(); while(1) { semp(); if(strncmp(ptr, "end", 3) == 0) { break; } printf("s = %s\n", ptr); } shmdt(ptr); shmctl(shmid, IPC_RMID, NULL); //从系统中删除共享存储段 semdel(); //删除信号量 }

转载请注明原文地址: https://www.6miu.com/read-750127.html

最新回复(0)