/*
* pipes.c
*
* A set of processes randomly messaging each the other, with pipes.
*
*
* Created by Mij <mij@bitchx.it> on 05/01/05.
* Original source file available on http://mij.oltrelinux.com/devel/unixprg/
*
*/
/* ... */
#include <stdio.h>
/* for read() and write() */
#include <sys/types.h>
#include <sys/uio.h>
/* for strlen and others */
#include <string.h>
/* for pipe() */
#include <unistd.h>
/* for [s]random() */
#include <stdlib.h>
/* for time() [seeding srandom()] */
#include <time.h>
#define PROCS_NUM 15 /* 1 < number of processes involved <= 255 */
#define MAX_PAYLOAD_LENGTH 50 /* message length */
#define DEAD_PROC -1 /* a value to mark a dead process' file descriptors with */
/* *** DATA TYPES *** */
/* a process address */
typedef char proc_addr;
/* a message */
struct message_s {
proc_addr src_id;
short int length;
char *payload;
};
/* *** FUNCTION PROTOTYPES *** */
/* send message to process with id dest */
int send_proc_message(proc_addr dest, char *message);
/* receive a message in the process' queue of received ones */
int receive_proc_message(struct message_s *msg);
/* mark process file descriptors closed */
void mark_proc_closed(proc_addr process);
/* *** GLOBAL VARS *** */
/* they are OK to be global here. */
proc_addr my_address; /* stores the id of the process */
int proc_pipes[PROCS_NUM][2]; /* stores the pipes of every process involved */
int main(int argc, char *argv[])
{
pid_t child_pid;
pid_t my_children[PROCS_NUM]; /* PIDs of the children */
int i, ret;
char msg_text[MAX_PAYLOAD_LENGTH]; /* payload of the message to send */
proc_addr msg_recipient;
struct message_s msg;
/* create a pipe for me (the parent) */
pipe(proc_pipes[0]);
/* initializing proc_pipes struct */
for (i = 1; i < PROCS_NUM; i++) {
/* creating one pipe for every (future) process */
ret = pipe(proc_pipes[i]);
if (ret) {
perror("Error creating pipe");
abort();
}
}
/* fork [1..NUM_PROCS] children. 0 is me. */
for (i = 1; i < PROCS_NUM; i++) {
/* setting the child address */
my_address = my_address + 1;
child_pid = fork();
if (! child_pid) {
/* child */
sleep(1);
/* closing other process' pipes read ends */
for (i = 0; i < PROCS_NUM; i++) {
if (i != my_address)
close(proc_pipes[i][0]);
}
/* init random num generator */
srandom(time(NULL));
/* my_address is now my address, and will hereby become a "constant" */
/* producing some message for the other processes */
while (random() % (2*PROCS_NUM)) {
/* interleaving... */
sleep((unsigned int)(random() % 2));
/* choosing a random recipient (including me) */
msg_recipient = (proc_addr)(random() % PROCS_NUM);
/* preparing and sending the message */
sprintf(msg_text, "hello from process %u.", (int)my_address);
ret = send_proc_message(msg_recipient, msg_text);
if (ret > 0) {
/* message has been correctly sent */
printf(" --> %d: sent message to %u\n", my_address, msg_recipient);
} else {
/* the child we tried to message does no longer exist */
mark_proc_closed(msg_recipient);
printf(" --> %d: recipient %u is no longer available\n", my_address, msg_recipient);
}
}
/* now, reading the first 2 messages we've been sent */
for (i = 0; i < 2; i++) {
ret = receive_proc_message(&msg);
if (ret < 0) break;
printf("<-- Process %d, received message from %u: \"%s\".\n", my_address, msg.src_id, msg.payload);
};
/* i'm exiting. making my pipe widowed */
close(proc_pipes[my_address][0]);
printf("# %d: i am exiting.\n", my_address);
exit(0);
}
/* saving the child pid (for future killing) */
my_children[my_address] = child_pid;
/* parent. I don't need the read descriptor of the pipe */
close(proc_pipes[my_address][0]);
/* this is for making srandom() consistent */
sleep(1);
}
/* expecting the user request to terminate... */
printf("Please press ENTER when you like me to flush the children...\n");
getchar();
printf("Ok, terminating dandling processes...\n");
/* stopping freezed children */
for (i = 1; i < PROCS_NUM; i++) {
kill(my_children[i], SIGTERM);
}
printf("Done. Exiting.\n");
return 0;
}
int send_proc_message(proc_addr dest, char *message)
{
int ret;
char *msg = (char *)malloc(sizeof(message) + 2);
/* the write should be atomic. Doing our best */
msg[0] = (char)dest;
memcpy((void *)&(msg[1]), (void *)message, strlen(message)+1);
/* send message, including the "header" the trailing '\0' */
ret = write(proc_pipes[dest][1], msg, strlen(msg)+2);
free(msg);
return ret;
}
int receive_proc_message(struct message_s *msg)
{
char c = 'x';
char temp_string[MAX_PAYLOAD_LENGTH];
int ret, i = 0;
/* first, getting the message sender */
ret = read(proc_pipes[my_address][0], &c, 1);
if (ret == 0) {
return 0;
}
msg->src_id = (proc_addr)c;
do {
ret = read(proc_pipes[my_address][0], &c, 1);
temp_string[i++] = c;
} while ((ret > 0) && (c != '\0') && (i < MAX_PAYLOAD_LENGTH));
if (c == '\0') {
/* msg correctly received. Preparing message packet */
msg->payload = (char *)malloc(strlen(temp_string) + 1);
strncpy(msg->payload, temp_string, strlen(temp_string) + 1);
return 0;
}
return -1;
}
void mark_proc_closed(proc_addr process)
{
proc_pipes[process][0] = DEAD_PROC;
proc_pipes[process][1] = DEAD_PROC;
}