/*
 *  pipes.c
 *  
 *  A set of processes randomly messaging each the other, with pipes.
 *
 *
 *  Created by Mij <mij@bitchx.it> on 05/01/05.
 *  Original source file available on http://mij.oltrelinux.com/devel/unixprg/
 *
 */


/* ... */
#include <stdio.h>
/* for read() and write() */
#include <sys/types.h>
#include <sys/uio.h>
/* for strlen and others */
#include <string.h>
/* for pipe() */
#include <unistd.h>
/* for [s]random() */
#include <stdlib.h>
/* for time() [seeding srandom()] */
#include <time.h>


#define PROCS_NUM               15          /* 1 < number of processes involved <= 255 */
#define MAX_PAYLOAD_LENGTH      
50          /* message length */
#define DEAD_PROC               -
1          /* a value to mark a dead process' file descriptors with */


        /*      ***                     DATA TYPES                  ***     */
/* a process address */
typedef char proc_addr;

/* a message */
struct message_s {
    proc_addr src_id;
    short int length;
    char *payload;
};


        /*      ***                     FUNCTION PROTOTYPES         ***     */
/* send message to process with id dest */
int send_proc_message(proc_addr dest, char *message);
/* receive a message in the process' queue of received ones */
int receive_proc_message(struct message_s *msg);
/* mark process file descriptors closed */
void mark_proc_closed(proc_addr process);


        /*              ***             GLOBAL VARS                 ***     */
/* they are OK to be global here. */
proc_addr my_address;                   /* stores the id of the process */
int proc_pipes[PROCS_NUM][2];           /* stores the pipes of every process involved */


int main(int argc, char *argv[])
{
    pid_t child_pid;
    pid_t my_children[PROCS_NUM];               /* PIDs of the children */
    int i, ret;
    char msg_text[MAX_PAYLOAD_LENGTH];       /* payload of the message to send */
    proc_addr msg_recipient;                    
    struct message_s msg;
    
    
    
    /* create a pipe for me (the parent) */
    pipe(proc_pipes[0]);

    /* initializing proc_pipes struct */
    for (i = 1; i < PROCS_NUM; i++) {
        /* creating one pipe for every (future) process */
        ret = pipe(proc_pipes[i]);
        if (ret) {
            perror("Error creating pipe");
            abort();
        }        
    }
        
    
    /* fork [1..NUM_PROCS] children. 0 is me. */
    for (i = 1; i < PROCS_NUM; i++) {
        /* setting the child address */
        my_address = my_address + 1;
        
        child_pid = fork();
        if (! child_pid) {
            /* child */
            sleep(1);

            /* closing other process' pipes read ends */
            for (i = 0; i < PROCS_NUM; i++) {
                if (i != my_address)
                    close(proc_pipes[i][0]);
            }
            
            
            /* init random num generator */
            srandom(time(NULL));

            /* my_address is now my address, and will hereby become a "constant" */
            /* producing some message for the other processes */
            while (random() % (2*PROCS_NUM)) {
                /* interleaving... */
                sleep((unsigned int)(random() % 2));
                
                /* choosing a random recipient (including me) */
                msg_recipient = (proc_addr)(random() % PROCS_NUM);
                
                /* preparing and sending the message */
                sprintf(msg_text, "hello from process %u.", (int)my_address);
                ret = send_proc_message(msg_recipient, msg_text);
                if (ret > 0) {
                    /* message has been correctly sent */
                    printf("    --> %d: sent message to %u\n", my_address, msg_recipient);
                } else {
                    /* the child we tried to message does no longer exist */
                    mark_proc_closed(msg_recipient);
                    printf("    --> %d: recipient %u is no longer available\n", my_address, msg_recipient);
                }
            }
            
            /* now, reading the first 2 messages we've been sent */
            for (i = 0; i < 2; i++) {
                ret = receive_proc_message(&msg);
                if (ret < 0) break;
                printf("<--     Process %d, received message from %u: \"%s\".\n", my_address, msg.src_id, msg.payload);
            };
            
            /* i'm exiting. making my pipe widowed */
            close(proc_pipes[my_address][0]);
            
            printf("# %d: i am exiting.\n", my_address);
            exit(0);
        }
        
        /* saving the child pid (for future killing) */
        my_children[my_address] = child_pid;

        /* parent. I don't need the read descriptor of the pipe */
        close(proc_pipes[my_address][0]);
        
        /* this is for making srandom() consistent */
        sleep(1);
    }
    
    /* expecting the user request to terminate... */
    printf("Please press ENTER when you like me to flush the children...\n");
    getchar();

    printf("Ok, terminating dandling processes...\n");
    /* stopping freezed children */
    for (i = 1; i < PROCS_NUM; i++) {
        kill(my_children[i], SIGTERM);
    }
    printf("Done. Exiting.\n");
    
    return 0;
}


int send_proc_message(proc_addr dest, char *message)
{
    int ret;
    char *msg = (char *)malloc(sizeof(message) + 2);
    
    
    /* the write should be atomic. Doing our best */
    msg[0] = (char)dest;
    memcpy((void *)&(msg[1]), (void *)message, strlen(message)+1);
    
    /* send message, including the "header" the trailing '\0' */
    ret = write(proc_pipes[dest][1], msg, strlen(msg)+2);
    free(msg);
    
    return ret;
}


int receive_proc_message(struct message_s *msg)
{
    char c = 'x';
    char temp_string[MAX_PAYLOAD_LENGTH];
    int ret, i = 0;

    
    /* first, getting the message sender */
    ret = read(proc_pipes[my_address][0], &c, 1);
    if (ret == 0) {
        return 0;
    }
    msg->src_id = (proc_addr)c;

    do {
        ret = read(proc_pipes[my_address][0], &c, 1);
        temp_string[i++] = c;
    } while ((ret > 0) && (c != '\0') && (i < MAX_PAYLOAD_LENGTH));

    
    
    if (c == '\0') {
        /* msg correctly received. Preparing message packet */
        
        msg->payload = (char *)malloc(strlen(temp_string) + 1);
        strncpy(msg->payload, temp_string, strlen(temp_string) + 1);
        
        return 0;
    }


    return -1;
}


void mark_proc_closed(proc_addr process)
{
    proc_pipes[process][0] = DEAD_PROC;
    proc_pipes[process][1] = DEAD_PROC;
}