- The Theory
- The UNIX Way
- Message Passing
- Join the Queue
- Plug It In
- Dont Tie Me Down
Message Passing
Now that we can pass bytes between processes, the next step is to build a generalized message-passing interface. Everything we send across the pipe now will have a standard format—a tuple containing the type of the message, the length of the message, and the body.
We’ll use the type as the index of an array of function pointers to process the message. To begin with, we define a type for our message-handling function:
typedef void (*message_handler)(void*);
This is a very simple function type; it takes a pointer and returns nothing. This plan restricts our message-handling functions to relying on global data or being entirely stateless. A more advanced implementation might also pass a struct containing some state information to each message handler.
To implement the message-sending operation, we use the writev(2) system call, which allows us to put the message type, length, and body into the pipe in a single system call. Unfortunately, we can’t do the same thing easily at the receiving end, because we have to retrieve the length of the body before we can receive it. We can avoid this problem by enforcing fixed-length messages, but this approach removes some flexibility from the system.
The example in Listing 3 spawns a child process that responds to two messages. Both take an array of two integers and print either the sum or the difference between the integers. Obviously, this mechanism shouldn’t be used for such a trivial purpose in the real world, since the cost of sending and receiving the message is far greater than the cost of processing it.
Listing 3 messages.c.
#include <stdio.h> #include <sys/types.h> #include <sys/uio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #define IN 0 #define OUT 1 int * pipepair() { int * pipes = calloc(2, sizeof(int)); if(pipe(pipes) != 0) { free(pipes); pipes = NULL; } return pipes; } //Message Handler type typedef void (*message_handler)(void*); //Message handlers void default_handler(void* unused) { fprintf(stderr, "Unrecognised message type received\n"); } void add(int * numbers) { printf("%d + %d = %d\n", numbers[0], numbers[1], numbers[0] + numbers[1]); } void subtract(int * numbers) { printf("%d - %d = %d\n", numbers[0], numbers[1], numbers[0] - numbers[1]); } void message_loop(message_handler* handlers, int pipe) { char type; unsigned int length; unsigned int buffersize = 1024; char * message = malloc(1024); struct iovec header[2] = {{&type, sizeof(type)},{&length, sizeof(length)}}; while(1) { readv(pipe, header, 2); if(length > buffersize) { buffersize = length; message = realloc(message, buffersize); } if(type == 0) { return; } read(pipe, message, length); handlers[type](message); } } void send_message(int pipe, char type, char * body, unsigned int length) { struct iovec message[3] = {{&type, 1}, {&length, sizeof(length)}, {body, length}}; writev(pipe, message, 3); } message_handler * new_handlers(void) { message_handler * handlers = calloc(255, sizeof(message_handler)); for(unsigned int i=0 ; i<255 ; i++) { handlers[i] = default_handler; } return handlers; } int main(void) { int * pipes = pipepair(); if(pipes == NULL) { return -1; } pid_t pid = fork(); if(pid == 0) { message_handler * handlers = new_handlers(); handlers[1] = (message_handler)add; handlers[2] = (message_handler)subtract; message_loop(handlers, pipes[IN]); } else { int body[2] = {8,6}; send_message(pipes[OUT], 1, (void*)body, 2*sizeof(int)); send_message(pipes[OUT], 2, (void*)body, 2*sizeof(int)); send_message(pipes[OUT], 0, NULL, 0); } return 0; }
Note that the read(2) call isn’t guaranteed to return the requested number of bytes. It’s possible that an implementation could return part of the message. To eliminate this risk, the read(2) call should be wrapped in a loop that checks the return value and attempts to read the rest if a message is only partially received. This implementation is left as an exercise for the reader.