Treść:
Jeden proces (master) oczekuje na pojawienie się podległych mu procesów (slaves). Za pomocą dodatkowego procesu możemy startować podległe procesy. Nowy proces wysyła masterowi sygnał, że się pojawił i poprzez kolejkę komunikatów przekazuje mu namiary na siebie. Od tego momentu master zaczyna komunikować się z nowopowstałym slave’wem periodycznie, według wzorca pytanie/odpowiedź poprzez named pipes, co slave uwidacznia.
Rozwiązanie składa się z dwóch programów: master.cpp i slave.cpp
Master musi być odpalony pierwszy (tworzy kolejkę komunikatów i czeka na sygnał). Master musi być odpalany z nazwą 'masterProcess’ – do procesu o takiej nazwie 'slave’ wysyła sygnał.
Dodałem sporo komentarzy tłumaczących co i jak działa.
master.cpp (do pobrania: http://paste.ots.me/404783 )
// Master // kompilacja: g++ master.cpp -lstdc++ -o masterProcess && ./masterProcess /* Jeden proces (master) oczekuje na pojawienie się podległych mu procesów (slaves). Za pomocą dodatkowego procesu możemy startować podległe procesy. Nowy proces wysyła masterowi sygnał, że się pojawił i poprzez kolejkę komunikatów przekazuje mu namiary na siebie. Od tego momentu master zaczyna komunikować się z nowopowstałym slave’wem periodycznie, według wzorca pytanie/odpowiedź poprzez named pipes, co slave uwidacznia. */ #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <fcntl.h> #include <signal.h> #include <sys/time.h> #include <sys/msg.h> #define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0) // prosta funkcja zwracajaca czas struct timeval czas; int ret; // zmienna przyjmujaca zwracana wartosc z paru funkcji w programie char* getTime() { ret = gettimeofday(&czas, NULL); char* buffer = (char *) malloc(sizeof(char) * 18); if(ret == 0) { sprintf(buffer, "%ld.%06ld:", czas.tv_sec, czas.tv_usec); } else { strcpy(buffer, "Error:"); } return buffer; } // funkcja obslugujaca przychodzace sygnaly (tylko SIGINT) // sygnaly sa zliczane, a nie zapamietywane jako 0 lub 1 ('czy przyszedl'), // bo moze przyjsc kilka zanim zacznie sie ich obsluga w 'main' int otrzymalSygnalow = 0; static void obslugaSygnalu(int signum) { otrzymalSygnalow += 1; printf("Otrzymano sygnal SIGINT\n"); } // struktura do przesylania przez kolejke komunikatow (FIFO) struct Komunikat { // pierwszy w strukturze MUSI byc 'long int' [o dowolnej nazwie], // w nim bedzie przechowywany 'typ' komunikatu // w funkcji ktora pobieramy komunikaty: // msgrcv(kolejkaKomunikatowID, &komunikat, sizeof(struct Komunikat), 1, 0); // ta '1' to wlasnie typ komunikatu jaki chcemy otrzymac // (proces 'slave' przed wyslaniem komunikatu ustawia w strukturze typ na '1') long int typeKomunikatu; long int pid; }; // limit obslugiwanych slave, tej wartosci uzywamy przy tworzeniu tablic z wejsciami i wyjsciami named-pipe int iloscMaksymalnaSlave = 30; // czas miedzy sprawdzeniami czy jest jakis sygnal int czasPomiedzyObslugaSygnalow = 1000; // 0.001 sec - 1 ms // czas pomiedzy obsluga sygnalow i periodyczna komunikacja w mikrosekundach int czasPeriodycznejKomunikacji = 500000; // 0.5 sec int main(int argc, char *argv[]) { int kolejkaKomunikatowID; int i; // iterator do 'for' Komunikat komunikat; // komunikat przesylany przez kolejke komunikatow char buffer [50]; // taki sam rozmiar jak w slave int iloscSlaveow = 0; // ilosc juz obslugiwanych slave int wejscie[iloscMaksymalnaSlave]; // named-pipe wejscia od slave int wyjscie[iloscMaksymalnaSlave]; // named-pipe wyjscia do slave int czasOdOstatniejPeriodycznejKomunikacji = 0; // zlicza czas od ostatniej periodycznej komunikacji z slave // ustawianie funkcji 'obslugaSygnalu' jako tej ktora obsluguje sygnal 'SIGINT' (ctrl+c) struct sigaction akcja; akcja.sa_handler = obslugaSygnalu; akcja.sa_flags = 0; sigaction(SIGINT, &akcja, NULL); // koniec ustawiania // tworze lub pobieram kolejke komunikatow o ID 123 // jako, ze mogla juz istniec i cos w niej moglo byc to lepiej ja.. if((kolejkaKomunikatowID = msgget(123, IPC_CREAT | 0660 )) == -1) { printf("Nie udalo sie utworzyc/pobrac ID kolejki o podanym kluczu.\n"); return EXIT_FAILURE; } // ..usunac msgctl(kolejkaKomunikatowID, IPC_RMID, (struct msqid_ds *) NULL); // a potem stworzyc na nowo, teraz juz na 100% kolejka jest pusta if((kolejkaKomunikatowID = msgget(123, IPC_CREAT | 0660 )) == -1) { printf("Nie udalo sie utworzyc/pobrac ID kolejki o podanym kluczu.\n"); return EXIT_FAILURE; } while(true) { // spimy okreslony czas usleep(czasPomiedzyObslugaSygnalow); czasOdOstatniejPeriodycznejKomunikacji += czasPomiedzyObslugaSygnalow; // sygnalow moglo przyjsc kilka lub w trakcie trwania petli mogl dojsc kolejny // wiec obslugujemy w petli 'while', a nie uzywajac 'if' while(otrzymalSygnalow > 0) { printf("Obsluga sygnalu %d\n", iloscSlaveow); otrzymalSygnalow -= 1; // jesli limit slave przekroczony to wylecimy poza pamiec w tablicy! if(iloscSlaveow == iloscMaksymalnaSlave) { printf("Przekroczono limit (%d) obslugiwanych slave!\n", iloscMaksymalnaSlave); // przerywamy obsluge 'sygnalu' - przechodzimy do komunikacji z juz znanymi slave break; } ret = msgrcv(kolejkaKomunikatowID, &komunikat, sizeof(struct Komunikat), 1, 0); printf("Odczytano komunikat\n"); if(ret == -1) { // -1 = wystapil blad podczas odczytu komunikatu [np. usunieto kolejke komunikatow] handle_error("Wystapil blad podczas odczytu komunikatu."); } else if(ret != sizeof(struct Komunikat)) { // rozmiar komunikatu jest inny, niz rozmiar naszej struktury // ktos inny, niz slave wyslal cos do naszej kolejki komunikatow? printf("Pobrany komunikat ma inny rozmiar, niz rozmiar struktury.\n"); return EXIT_FAILURE; } // przygotowujemy nazwe named-pipe ktory chcemy otworzyc sprintf(buffer, "%ld.to_master", komunikat.pid); printf("Otwieram named-pipe z przychodzacymi wiadomosciami %s\n", buffer); wejscie[iloscSlaveow] = open(buffer, O_RDONLY); if(wejscie[iloscSlaveow] == -1) { printf("Nie udalo sie otworzyc named-pipe odbierajacego.\n"); return EXIT_FAILURE; } // przygotowujemy nazwe named-pipe ktory chcemy otworzyc sprintf(buffer, "%ld.to_slave", komunikat.pid); printf("Otwieram named-pipe z wychodzacymi wiadomosciami %s\n", buffer); wyjscie[iloscSlaveow] = open(buffer, O_WRONLY); if(wyjscie[iloscSlaveow] == -1) { printf("Nie udalo sie otworzyc named-pipe wysylajacego.\n"); return EXIT_FAILURE; } printf("Dodano slave o ID %d\n", iloscSlaveow); iloscSlaveow += 1; } // nie za kazdym obrotem petli chcemy wysylac/odbierac komunikaty // sprawdzamy czy od ostatniej komunikacji z slaveami minelo dosc czasu if(czasOdOstatniejPeriodycznejKomunikacji > czasPeriodycznejKomunikacji) { // zerujemy licznik czasu czasOdOstatniejPeriodycznejKomunikacji = 0; // lecimy petla przez wszystkich slave for(i = 0; i < iloscSlaveow; i++) { // wypelniamy buffer trescia z wejscia od konkretnego slave // jesli slave przestanie pisac to 'read' stanie w miejscu :( ret = read(wejscie[i], buffer, sizeof(buffer)); if(ret <= 0) { printf("Funkcja read zwrocila error dla slave %d.\n", i); perror(NULL); return EXIT_FAILURE; } printf("%s Odczytano %d znakow od slave ID %d tresc: %s\n", getTime(), ret, i, buffer); // wstawiamy do buffer wysylany tekst strcpy(buffer,"DZIALA!"); // przepisujemy tresc z buffer na wyjscie do konkretnego slave ret = write(wyjscie[i], buffer, sizeof(buffer)); if(ret == -1) { printf("Funkcja write zwrocila error dla slave %d.\n", i); perror(NULL); return EXIT_FAILURE; } printf("%s Wyslano %d znakow do slave ID %d tresc: %s\n", getTime(), ret, i, buffer); } } } }
slave.cpp (do pobrania: http://paste.ots.me/404784 )
// Slave // kompilacja: g++ slave.cpp -o slave && ./slave /* Jeden proces (master) oczekuje na pojawienie się podległych mu procesów (slaves). Za pomocą dodatkowego procesu możemy startować podległe procesy. Nowy proces wysyła masterowi sygnał, że się pojawił i poprzez kolejkę komunikatów przekazuje mu namiary na siebie. Od tego momentu master zaczyna komunikować się z nowopowstałym slave’wem periodycznie, według wzorca pytanie/odpowiedź poprzez named pipes, co slave uwidacznia. */ #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <fcntl.h> #include <sys/time.h> #include <sys/msg.h> #include <sys/stat.h> #define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0) // prosta funkcja zwracajaca czas struct timeval czas; int ret; // zmienna przyjmujaca zwracana wartosc z paru funkcji w programie char* getTime() { ret = gettimeofday(&czas, NULL); char* buffer = (char *) malloc(sizeof(char) * 18); if(ret == 0) { sprintf(buffer, "%ld.%06ld:", czas.tv_sec, czas.tv_usec); } else { strcpy(buffer, "Error:"); } return buffer; } // struktura do przesylania przez kolejke komunikatow (FIFO) struct Komunikat { // pierwszy w strukturze MUSI byc 'long int' [o dowolnej nazwie], // w nim bedzie przechowywany 'typ' komunikatu // w funkcji ktora pobieramy komunikaty: // msgrcv(kolejkaKomunikatowID, &komunikat, sizeof(struct Komunikat), 1, 0); // ta '1' to wlasnie typ komunikatu jaki chcemy otrzymac // (proces 'slave' przed wyslaniem komunikatu ustawia w strukturze typ na '1') long int typeKomunikatu; long int pid; }; int main(int argc, char *argv[]) { long int pid = (long int) getpid(); char buffer [50]; int kolejkaKomunikatowID; int pipeWysylajacy; int pipeOdbierajacy; Komunikat komunikat; // tworzymy named pipe do wysylania i odbierania sprintf(buffer, "%ld.to_master", pid); printf("Tworze named-pipe %s\n", buffer); if(mkfifo(buffer,0666) != 0) { handle_error("Nie udalo sie utworzyc named-pipe'a do wysylania."); } sprintf(buffer, "%ld.to_slave", pid); printf("Tworze named-pipe %s\n", buffer); if(mkfifo(buffer,0666) != 0) { handle_error("Nie udalo sie utworzyc named-pipe'a do odbierania."); } // wysylamy sygnal, zeby proces master zaczal czekac na nasz komunikat printf("Wysylam sygnal SIGINT do masterProcess\n"); system("killall -2 masterProcess"); // pobieramy ID kolejki komunikatow [utworzonej przez proces master] printf("Pobieram ID kolejki komunikatow\n"); if((kolejkaKomunikatowID = msgget(123, 0660)) == -1) { printf("Nie udalo sie pobrac ID kolejki o podanym kluczu.\n"); return EXIT_FAILURE; } // ustalamy typ komunikatu na 1, taki typ bedziemy pobierac w programie 'master' funkcja 'msgrcv' komunikat.typeKomunikatu = 1; // przesylamy PID aktualnego procesu, // bo utworzylismy named-pipe o nazwach zawierajacych pid 'slave' // ("tu_pid.to_master" i "tu_pid.to_slave") komunikat.pid = pid; printf("Wysylam komunikat\n"); if(msgsnd(kolejkaKomunikatowID, &komunikat, sizeof(struct Komunikat), 0) == -1) { handle_error("Nie udalo sie wyslac komunikatu."); } // komunikat wyslany, sygnal wyslany, named-pipe utworzone [ale nie otwarte] // teraz otwieramy named-pipe sprintf(buffer, "%ld.to_master", pid); printf("Otwieram named-pipe %s\n", buffer); pipeWysylajacy = open(buffer, O_WRONLY); if(pipeWysylajacy == -1) { printf("Nie udalo sie otworzyc fifo wysylajacego.\n"); return EXIT_FAILURE; } sprintf(buffer, "%ld.to_slave", pid); printf("Otwieram named-pipe %s\n", buffer); pipeOdbierajacy = open(buffer, O_RDONLY); if(pipeOdbierajacy == -1) { printf("Nie udalo sie otworzyc fifo odbierajacego.\n"); return EXIT_FAILURE; } // teraz zaczynamy w petli wysylac 'DZIALA?' i odbierac co nam master napisze while(true) { // wstawiamy do buffer wysylany tekst strcpy(buffer,"DZIALA?"); // przepisujemy tresc z buffor na wyjscie do master ret = write(pipeWysylajacy, buffer, sizeof(buffer)); if(ret == -1) { printf("Funkcja write zwrocila error.\n"); perror(NULL); return EXIT_FAILURE; } printf("Wyslano %d znakow do %ld tresc: %s\n", ret, komunikat.pid, buffer); // wypelniamy buffer trescia z wejscia od master // jesli master przestanie pisac to 'read' stanie w miejscu :( ret = read(pipeOdbierajacy, buffer, sizeof(buffer)); if(ret <= 0) { printf("Funkcja read zwrocila error.\n"); perror(NULL); return EXIT_FAILURE; } printf("Odczytano %d znakow od %ld tresc: %s\n", ret, komunikat.pid, buffer); // petla nie ma zadnego 'sleep', bo funkcja 'read' zablokuje sie do czasu, // az master wpisze odpowiedz ('DZIALA!'), wiec to master odpowiada za 'periodycznosc' } }