Treść:
Jeden proces (master) oczekuje na pojawienie się podległych mu procesów (slaves). Za pomocą dodatkowego procesu możemy startować podległe procesy. Nowy proces wysyła masterowi sygnał, że się pojawił i poprzez kolejkę komunikatów przekazuje mu namiary na siebie. Od tego momentu master zaczyna komunikować się z nowopowstałym slave’wem periodycznie, według wzorca pytanie/odpowiedź poprzez named pipes, co slave uwidacznia.
Rozwiązanie składa się z dwóch programów: master.cpp i slave.cpp
Master musi być odpalony pierwszy (tworzy kolejkę komunikatów i czeka na sygnał). Master musi być odpalany z nazwą 'masterProcess’ – do procesu o takiej nazwie 'slave’ wysyła sygnał.
Dodałem sporo komentarzy tłumaczących co i jak działa.
master.cpp (do pobrania: http://paste.ots.me/404783 )
// Master
// kompilacja: g++ master.cpp -lstdc++ -o masterProcess && ./masterProcess
/*
Jeden proces (master) oczekuje na pojawienie się podległych mu procesów (slaves). Za pomocą dodatkowego procesu możemy startować podległe procesy. Nowy proces wysyła masterowi sygnał, że się pojawił i poprzez kolejkę komunikatów przekazuje mu namiary na siebie. Od tego momentu master zaczyna komunikować się z nowopowstałym slave’wem periodycznie, według wzorca pytanie/odpowiedź poprzez named pipes, co slave uwidacznia.
*/
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/msg.h>
#define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0)
// prosta funkcja zwracajaca czas
struct timeval czas;
int ret; // zmienna przyjmujaca zwracana wartosc z paru funkcji w programie
char* getTime()
{
ret = gettimeofday(&czas, NULL);
char* buffer = (char *) malloc(sizeof(char) * 18);
if(ret == 0)
{
sprintf(buffer, "%ld.%06ld:", czas.tv_sec, czas.tv_usec);
}
else
{
strcpy(buffer, "Error:");
}
return buffer;
}
// funkcja obslugujaca przychodzace sygnaly (tylko SIGINT)
// sygnaly sa zliczane, a nie zapamietywane jako 0 lub 1 ('czy przyszedl'),
// bo moze przyjsc kilka zanim zacznie sie ich obsluga w 'main'
int otrzymalSygnalow = 0;
static void obslugaSygnalu(int signum)
{
otrzymalSygnalow += 1;
printf("Otrzymano sygnal SIGINT\n");
}
// struktura do przesylania przez kolejke komunikatow (FIFO)
struct Komunikat
{
// pierwszy w strukturze MUSI byc 'long int' [o dowolnej nazwie],
// w nim bedzie przechowywany 'typ' komunikatu
// w funkcji ktora pobieramy komunikaty:
// msgrcv(kolejkaKomunikatowID, &komunikat, sizeof(struct Komunikat), 1, 0);
// ta '1' to wlasnie typ komunikatu jaki chcemy otrzymac
// (proces 'slave' przed wyslaniem komunikatu ustawia w strukturze typ na '1')
long int typeKomunikatu;
long int pid;
};
// limit obslugiwanych slave, tej wartosci uzywamy przy tworzeniu tablic z wejsciami i wyjsciami named-pipe
int iloscMaksymalnaSlave = 30;
// czas miedzy sprawdzeniami czy jest jakis sygnal
int czasPomiedzyObslugaSygnalow = 1000; // 0.001 sec - 1 ms
// czas pomiedzy obsluga sygnalow i periodyczna komunikacja w mikrosekundach
int czasPeriodycznejKomunikacji = 500000; // 0.5 sec
int main(int argc, char *argv[])
{
int kolejkaKomunikatowID;
int i; // iterator do 'for'
Komunikat komunikat; // komunikat przesylany przez kolejke komunikatow
char buffer [50]; // taki sam rozmiar jak w slave
int iloscSlaveow = 0; // ilosc juz obslugiwanych slave
int wejscie[iloscMaksymalnaSlave]; // named-pipe wejscia od slave
int wyjscie[iloscMaksymalnaSlave]; // named-pipe wyjscia do slave
int czasOdOstatniejPeriodycznejKomunikacji = 0; // zlicza czas od ostatniej periodycznej komunikacji z slave
// ustawianie funkcji 'obslugaSygnalu' jako tej ktora obsluguje sygnal 'SIGINT' (ctrl+c)
struct sigaction akcja;
akcja.sa_handler = obslugaSygnalu;
akcja.sa_flags = 0;
sigaction(SIGINT, &akcja, NULL);
// koniec ustawiania
// tworze lub pobieram kolejke komunikatow o ID 123
// jako, ze mogla juz istniec i cos w niej moglo byc to lepiej ja..
if((kolejkaKomunikatowID = msgget(123, IPC_CREAT | 0660 )) == -1)
{
printf("Nie udalo sie utworzyc/pobrac ID kolejki o podanym kluczu.\n");
return EXIT_FAILURE;
}
// ..usunac
msgctl(kolejkaKomunikatowID, IPC_RMID, (struct msqid_ds *) NULL);
// a potem stworzyc na nowo, teraz juz na 100% kolejka jest pusta
if((kolejkaKomunikatowID = msgget(123, IPC_CREAT | 0660 )) == -1)
{
printf("Nie udalo sie utworzyc/pobrac ID kolejki o podanym kluczu.\n");
return EXIT_FAILURE;
}
while(true)
{
// spimy okreslony czas
usleep(czasPomiedzyObslugaSygnalow);
czasOdOstatniejPeriodycznejKomunikacji += czasPomiedzyObslugaSygnalow;
// sygnalow moglo przyjsc kilka lub w trakcie trwania petli mogl dojsc kolejny
// wiec obslugujemy w petli 'while', a nie uzywajac 'if'
while(otrzymalSygnalow > 0)
{
printf("Obsluga sygnalu %d\n", iloscSlaveow);
otrzymalSygnalow -= 1;
// jesli limit slave przekroczony to wylecimy poza pamiec w tablicy!
if(iloscSlaveow == iloscMaksymalnaSlave)
{
printf("Przekroczono limit (%d) obslugiwanych slave!\n", iloscMaksymalnaSlave);
// przerywamy obsluge 'sygnalu' - przechodzimy do komunikacji z juz znanymi slave
break;
}
ret = msgrcv(kolejkaKomunikatowID, &komunikat, sizeof(struct Komunikat), 1, 0);
printf("Odczytano komunikat\n");
if(ret == -1)
{
// -1 = wystapil blad podczas odczytu komunikatu [np. usunieto kolejke komunikatow]
handle_error("Wystapil blad podczas odczytu komunikatu.");
}
else if(ret != sizeof(struct Komunikat))
{
// rozmiar komunikatu jest inny, niz rozmiar naszej struktury
// ktos inny, niz slave wyslal cos do naszej kolejki komunikatow?
printf("Pobrany komunikat ma inny rozmiar, niz rozmiar struktury.\n");
return EXIT_FAILURE;
}
// przygotowujemy nazwe named-pipe ktory chcemy otworzyc
sprintf(buffer, "%ld.to_master", komunikat.pid);
printf("Otwieram named-pipe z przychodzacymi wiadomosciami %s\n", buffer);
wejscie[iloscSlaveow] = open(buffer, O_RDONLY);
if(wejscie[iloscSlaveow] == -1)
{
printf("Nie udalo sie otworzyc named-pipe odbierajacego.\n");
return EXIT_FAILURE;
}
// przygotowujemy nazwe named-pipe ktory chcemy otworzyc
sprintf(buffer, "%ld.to_slave", komunikat.pid);
printf("Otwieram named-pipe z wychodzacymi wiadomosciami %s\n", buffer);
wyjscie[iloscSlaveow] = open(buffer, O_WRONLY);
if(wyjscie[iloscSlaveow] == -1)
{
printf("Nie udalo sie otworzyc named-pipe wysylajacego.\n");
return EXIT_FAILURE;
}
printf("Dodano slave o ID %d\n", iloscSlaveow);
iloscSlaveow += 1;
}
// nie za kazdym obrotem petli chcemy wysylac/odbierac komunikaty
// sprawdzamy czy od ostatniej komunikacji z slaveami minelo dosc czasu
if(czasOdOstatniejPeriodycznejKomunikacji > czasPeriodycznejKomunikacji)
{
// zerujemy licznik czasu
czasOdOstatniejPeriodycznejKomunikacji = 0;
// lecimy petla przez wszystkich slave
for(i = 0; i < iloscSlaveow; i++)
{
// wypelniamy buffer trescia z wejscia od konkretnego slave
// jesli slave przestanie pisac to 'read' stanie w miejscu :(
ret = read(wejscie[i], buffer, sizeof(buffer));
if(ret <= 0)
{
printf("Funkcja read zwrocila error dla slave %d.\n", i);
perror(NULL);
return EXIT_FAILURE;
}
printf("%s Odczytano %d znakow od slave ID %d tresc: %s\n", getTime(), ret, i, buffer);
// wstawiamy do buffer wysylany tekst
strcpy(buffer,"DZIALA!");
// przepisujemy tresc z buffer na wyjscie do konkretnego slave
ret = write(wyjscie[i], buffer, sizeof(buffer));
if(ret == -1)
{
printf("Funkcja write zwrocila error dla slave %d.\n", i);
perror(NULL);
return EXIT_FAILURE;
}
printf("%s Wyslano %d znakow do slave ID %d tresc: %s\n", getTime(), ret, i, buffer);
}
}
}
}
slave.cpp (do pobrania: http://paste.ots.me/404784 )
// Slave
// kompilacja: g++ slave.cpp -o slave && ./slave
/*
Jeden proces (master) oczekuje na pojawienie się podległych mu procesów (slaves). Za pomocą dodatkowego procesu możemy startować podległe procesy. Nowy proces wysyła masterowi sygnał, że się pojawił i poprzez kolejkę komunikatów przekazuje mu namiary na siebie. Od tego momentu master zaczyna komunikować się z nowopowstałym slave’wem periodycznie, według wzorca pytanie/odpowiedź poprzez named pipes, co slave uwidacznia.
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/msg.h>
#include <sys/stat.h>
#define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0)
// prosta funkcja zwracajaca czas
struct timeval czas;
int ret; // zmienna przyjmujaca zwracana wartosc z paru funkcji w programie
char* getTime()
{
ret = gettimeofday(&czas, NULL);
char* buffer = (char *) malloc(sizeof(char) * 18);
if(ret == 0)
{
sprintf(buffer, "%ld.%06ld:", czas.tv_sec, czas.tv_usec);
}
else
{
strcpy(buffer, "Error:");
}
return buffer;
}
// struktura do przesylania przez kolejke komunikatow (FIFO)
struct Komunikat
{
// pierwszy w strukturze MUSI byc 'long int' [o dowolnej nazwie],
// w nim bedzie przechowywany 'typ' komunikatu
// w funkcji ktora pobieramy komunikaty:
// msgrcv(kolejkaKomunikatowID, &komunikat, sizeof(struct Komunikat), 1, 0);
// ta '1' to wlasnie typ komunikatu jaki chcemy otrzymac
// (proces 'slave' przed wyslaniem komunikatu ustawia w strukturze typ na '1')
long int typeKomunikatu;
long int pid;
};
int main(int argc, char *argv[])
{
long int pid = (long int) getpid();
char buffer [50];
int kolejkaKomunikatowID;
int pipeWysylajacy;
int pipeOdbierajacy;
Komunikat komunikat;
// tworzymy named pipe do wysylania i odbierania
sprintf(buffer, "%ld.to_master", pid);
printf("Tworze named-pipe %s\n", buffer);
if(mkfifo(buffer,0666) != 0)
{
handle_error("Nie udalo sie utworzyc named-pipe'a do wysylania.");
}
sprintf(buffer, "%ld.to_slave", pid);
printf("Tworze named-pipe %s\n", buffer);
if(mkfifo(buffer,0666) != 0)
{
handle_error("Nie udalo sie utworzyc named-pipe'a do odbierania.");
}
// wysylamy sygnal, zeby proces master zaczal czekac na nasz komunikat
printf("Wysylam sygnal SIGINT do masterProcess\n");
system("killall -2 masterProcess");
// pobieramy ID kolejki komunikatow [utworzonej przez proces master]
printf("Pobieram ID kolejki komunikatow\n");
if((kolejkaKomunikatowID = msgget(123, 0660)) == -1)
{
printf("Nie udalo sie pobrac ID kolejki o podanym kluczu.\n");
return EXIT_FAILURE;
}
// ustalamy typ komunikatu na 1, taki typ bedziemy pobierac w programie 'master' funkcja 'msgrcv'
komunikat.typeKomunikatu = 1;
// przesylamy PID aktualnego procesu,
// bo utworzylismy named-pipe o nazwach zawierajacych pid 'slave'
// ("tu_pid.to_master" i "tu_pid.to_slave")
komunikat.pid = pid;
printf("Wysylam komunikat\n");
if(msgsnd(kolejkaKomunikatowID, &komunikat, sizeof(struct Komunikat), 0) == -1)
{
handle_error("Nie udalo sie wyslac komunikatu.");
}
// komunikat wyslany, sygnal wyslany, named-pipe utworzone [ale nie otwarte]
// teraz otwieramy named-pipe
sprintf(buffer, "%ld.to_master", pid);
printf("Otwieram named-pipe %s\n", buffer);
pipeWysylajacy = open(buffer, O_WRONLY);
if(pipeWysylajacy == -1)
{
printf("Nie udalo sie otworzyc fifo wysylajacego.\n");
return EXIT_FAILURE;
}
sprintf(buffer, "%ld.to_slave", pid);
printf("Otwieram named-pipe %s\n", buffer);
pipeOdbierajacy = open(buffer, O_RDONLY);
if(pipeOdbierajacy == -1)
{
printf("Nie udalo sie otworzyc fifo odbierajacego.\n");
return EXIT_FAILURE;
}
// teraz zaczynamy w petli wysylac 'DZIALA?' i odbierac co nam master napisze
while(true)
{
// wstawiamy do buffer wysylany tekst
strcpy(buffer,"DZIALA?");
// przepisujemy tresc z buffor na wyjscie do master
ret = write(pipeWysylajacy, buffer, sizeof(buffer));
if(ret == -1)
{
printf("Funkcja write zwrocila error.\n");
perror(NULL);
return EXIT_FAILURE;
}
printf("Wyslano %d znakow do %ld tresc: %s\n", ret, komunikat.pid, buffer);
// wypelniamy buffer trescia z wejscia od master
// jesli master przestanie pisac to 'read' stanie w miejscu :(
ret = read(pipeOdbierajacy, buffer, sizeof(buffer));
if(ret <= 0)
{
printf("Funkcja read zwrocila error.\n");
perror(NULL);
return EXIT_FAILURE;
}
printf("Odczytano %d znakow od %ld tresc: %s\n", ret, komunikat.pid, buffer);
// petla nie ma zadnego 'sleep', bo funkcja 'read' zablokuje sie do czasu,
// az master wpisze odpowiedz ('DZIALA!'), wiec to master odpowiada za 'periodycznosc'
}
}