进程间的通信
当程序运行时,父进程 fork 出 4 个子进程。父进程负责产生消息(每 1s 产生一个消息), 4 个子进程负责处理消息。父子进程之间通过消息队列来传递消息。父进程需要维护一个本地数据库(格式与共享数据库相同),当生成一个消息时,父进程要同步更新本地数据库。子进程在处理消息时,根据消息的内容来对共享数据库进行更新(比如,当一个子进程收到一个的消息时,需要将共享数据库中 index 为 2 的条目的 count 值递增 1 ),并延迟 500ms 。
父进程需要处理 SIGTERM 信号。父进程接收到 SIGTERM 信号意味着要求结束该程序。在结束程序之前,父进程要结束所有子进程、显示并对比本地数据库和共享数据库中的信息(两者应该相同)。 有点意思,好好写 本帖最后由 json 于 2017-7-7 11:49 编辑
看了一下题意,不是特别理解(估计是我个人问题,哈哈)。写了一个demo,还有许多问题,暂时先抛砖引玉啦~
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <signal.h>
#include <string.h>
#define OPT_INCR 0
#define OPT_INCRBY 1
#define OPT_DECR 2
#define OPT_DECRBY 3
#define DB_SIZE 10
#define CHILD_COUNT 4
int msgid = 1;
int main_running = 1;
int sub_running = 1;
int pids;
struct msg_t {
int opt;
int index;
int data;
};
void opt_handler(int * db, struct msg_t * msg) {
switch (msg->opt) {
case OPT_INCR:
db ++;
break;
case OPT_INCRBY:
db += msg->data;
break;
case OPT_DECR:
db --;
break;
case OPT_DECRBY:
db -= msg->data;
break;
deafult:
printf("Not support opt: %d\n", msg->opt);
}
}
void child_process(int * shm_db, int msgid) {
struct msg_t msg;
struct msqid_ds msg_info;
while (1) {
sleep(0.5);
msgctl(msgid, IPC_STAT, &msg_info);
if (msg_info.msg_qnum <= 0) {
//sleep(CHILD_COUNT);
if (!sub_running) break;
continue;
}
if (msgrcv(msgid, (struct msg_t *)&msg, sizeof(&msg), 0, 0) == -1) {
fprintf(stderr, "subpid[%d]: msgcrv failed\n", getpid());
continue;
}
printf("GET[%d]: %d, %d, %d\n", getpid(), msg.opt, msg.index, msg.data);
opt_handler(shm_db, &msg);
}
exit(EXIT_SUCCESS);
}
void show_db(int * db, int * shm_db) {
int i;
printf("index\tlocal val\tshm val\n");
for (i = 0; i < DB_SIZE; i++) {
printf("%d\t%d\t\t%d\n", i, db, shm_db);
}
}
void sub_usr1_handler() {
sub_running = 0;
}
void term_handler() {
printf("I got SIGTERM...\n");
main_running = 0;
int i;
for (i = 0; i < CHILD_COUNT; i++) {
kill(pids, SIGUSR1);
}
}
int main() {
int db, *shm_db, i, status=0, wpid;
key_t mapKey = (key_t)1234, msgKey = (key_t)4321;
struct msg_t msg;
char *pidfile = "tmp.pid";
int pid = getpid();
char pid_s;
sprintf(pid_s, "%d", pid);
FILE * fp = fopen(pidfile, "w");
if (fp == NULL) {
fprintf(stderr, "open %s file failed.\n", pidfile);
exit(EXIT_FAILURE);
}
fwrite(pid_s, sizeof(pid_s), 1, fp);
fclose(fp);
srand((unsigned)time(NULL));
msgid = msgget(msgKey, 0666 | IPC_CREAT);
if (msgid == -1) {
fprintf(stderr, "msgget error.\n");
exit(EXIT_FAILURE);
}
int shmid = shmget(mapKey, sizeof(db), IPC_CREAT);
if (shmid == -1) {
fprintf(stderr, "shmget error.\n");
exit(EXIT_FAILURE);
}
shm_db = (int *)shmat(shmid, NULL, 0);
memset(db, 0, sizeof(db));
memset(shm_db, 0, sizeof(db));
struct sigaction subsig;
subsig.sa_handler = sub_usr1_handler;
sigemptyset(&subsig.sa_mask);
subsig.sa_flags = SA_RESETHAND;
sigaction(SIGUSR1, &subsig, 0);
for (i = 0; i < CHILD_COUNT; i++) {
if ((pids =fork()) == 0) {
child_process(shm_db, msgid);
}
}
struct sigaction sig;
sig.sa_handler = term_handler;
sigemptyset(&sig.sa_mask);
sig.sa_flags = SA_RESETHAND;
sigaction(SIGTERM, &sig, 0);
show_db(db, shm_db);
while (main_running) {
msg.opt = rand()%4;
msg.index = rand()%8 + 1;
msg.data = rand()%10 + 1;
printf("POST[%d]: %d, %d, %d\n", getpid(), msg.opt, msg.index, msg.data);
if (msgsnd(msgid, &msg, sizeof(&msg), 0) == -1) {
fprintf(stderr, "msgsnd failed\n");
continue;
}
opt_handler(db, &msg);
sleep(1);
}
while( (wpid = wait(&status)) > 0) {
printf("subproc[%d] exit.\n", wpid);
}
show_db(db, shm_db);
if (msgctl(msgid, IPC_RMID, 0) == -1) {
fprintf(stderr, "msgctl error.\n");
exit(EXIT_FAILURE);
}
if (shmdt(shm_db) == -1) {
fprintf(stderr, "shm detach error\n");
exit(EXIT_FAILURE);
}
if (shmctl(shmid, IPC_RMID, 0) == -1) {
fprintf(stderr, "shm remove failed.\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
页:
[1]