#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <signal.h>
#include <string.h>
#define OPT_INCR 0
#define OPT_INCRBY 1
#define OPT_DECR 2
#define OPT_DECRBY 3
#define DB_SIZE 10
#define CHILD_COUNT 4
int msgid = 1;
int main_running = 1;
int sub_running = 1;
int pids[CHILD_COUNT];
struct msg_t {
int opt;
int index;
int data;
};
void opt_handler(int * db, struct msg_t * msg) {
switch (msg->opt) {
case OPT_INCR:
db[msg->index] ++;
break;
case OPT_INCRBY:
db[msg->index] += msg->data;
break;
case OPT_DECR:
db[msg->index] --;
break;
case OPT_DECRBY:
db[msg->index] -= msg->data;
break;
deafult:
printf("Not support opt: %d\n", msg->opt);
}
}
void child_process(int * shm_db, int msgid) {
struct msg_t msg;
struct msqid_ds msg_info;
while (1) {
sleep(0.5);
msgctl(msgid, IPC_STAT, &msg_info);
if (msg_info.msg_qnum <= 0) {
//sleep(CHILD_COUNT);
if (!sub_running) break;
continue;
}
if (msgrcv(msgid, (struct msg_t *)&msg, sizeof(&msg), 0, 0) == -1) {
fprintf(stderr, "subpid[%d]: msgcrv failed\n", getpid());
continue;
}
printf("GET[%d]: %d, %d, %d\n", getpid(), msg.opt, msg.index, msg.data);
opt_handler(shm_db, &msg);
}
exit(EXIT_SUCCESS);
}
void show_db(int * db, int * shm_db) {
int i;
printf("index\tlocal val\tshm val\n");
for (i = 0; i < DB_SIZE; i++) {
printf("%d\t%d\t\t%d\n", i, db[i], shm_db[i]);
}
}
void sub_usr1_handler() {
sub_running = 0;
}
void term_handler() {
printf("I got SIGTERM...\n");
main_running = 0;
int i;
for (i = 0; i < CHILD_COUNT; i++) {
kill(pids[i], SIGUSR1);
}
}
int main() {
int db[DB_SIZE], *shm_db, i, status=0, wpid;
key_t mapKey = (key_t)1234, msgKey = (key_t)4321;
struct msg_t msg;
char *pidfile = "tmp.pid";
int pid = getpid();
char pid_s[8];
sprintf(pid_s, "%d", pid);
FILE * fp = fopen(pidfile, "w");
if (fp == NULL) {
fprintf(stderr, "open %s file failed.\n", pidfile);
exit(EXIT_FAILURE);
}
fwrite(pid_s, sizeof(pid_s), 1, fp);
fclose(fp);
srand((unsigned)time(NULL));
msgid = msgget(msgKey, 0666 | IPC_CREAT);
if (msgid == -1) {
fprintf(stderr, "msgget error.\n");
exit(EXIT_FAILURE);
}
int shmid = shmget(mapKey, sizeof(db), IPC_CREAT);
if (shmid == -1) {
fprintf(stderr, "shmget error.\n");
exit(EXIT_FAILURE);
}
shm_db = (int *)shmat(shmid, NULL, 0);
memset(db, 0, sizeof(db));
memset(shm_db, 0, sizeof(db));
struct sigaction subsig;
subsig.sa_handler = sub_usr1_handler;
sigemptyset(&subsig.sa_mask);
subsig.sa_flags = SA_RESETHAND;
sigaction(SIGUSR1, &subsig, 0);
for (i = 0; i < CHILD_COUNT; i++) {
if ((pids[i] =fork()) == 0) {
child_process(shm_db, msgid);
}
}
struct sigaction sig;
sig.sa_handler = term_handler;
sigemptyset(&sig.sa_mask);
sig.sa_flags = SA_RESETHAND;
sigaction(SIGTERM, &sig, 0);
show_db(db, shm_db);
while (main_running) {
msg.opt = rand()%4;
msg.index = rand()%8 + 1;
msg.data = rand()%10 + 1;
printf("POST[%d]: %d, %d, %d\n", getpid(), msg.opt, msg.index, msg.data);
if (msgsnd(msgid, &msg, sizeof(&msg), 0) == -1) {
fprintf(stderr, "msgsnd failed\n");
continue;
}
opt_handler(db, &msg);
sleep(1);
}
while( (wpid = wait(&status)) > 0) {
printf("subproc[%d] exit.\n", wpid);
}
show_db(db, shm_db);
if (msgctl(msgid, IPC_RMID, 0) == -1) {
fprintf(stderr, "msgctl error.\n");
exit(EXIT_FAILURE);
}
if (shmdt(shm_db) == -1) {
fprintf(stderr, "shm detach error\n");
exit(EXIT_FAILURE);
}
if (shmctl(shmid, IPC_RMID, 0) == -1) {
fprintf(stderr, "shm remove failed.\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}