ZSO – Trzeci projekt Justyny

Treść zadania:
Mamy N procesów jednowątkowych uruchomionych w klastrze komputerów – na różnych węzłach. Procesy zorganizowane są w listę cykliczną. Między procesami „wędruje” token. Proces P, który ma token może się rozmnożyć jak chce, ale nie musi. Jak się rozmnoży to następny proces jest wstawiany do listy przed procesem P, a token wędruje sobie dalej. Jak lista ma 2*N procesów wówczas wszystkie procesy się kończą

Rozwiązanie może działać na paru węzłach (komputerach). Przykład (skrypt startowy) odpala program parę razy na jednym komputerze na różnych portach.

Kompilacja:

g++ zso3.cpp -o zso3

Skrypt startowy:

#!/bin/bash
iloscProcesow=5
portPoczatkowy=12000
plik=zso3
iloscPortowMiedzyProcesami=$(($iloscProcesow*2+1))

./$plik $portPoczatkowy 127.0.0.1 $(($iloscPortowMiedzyProcesami+$portPoczatkowy)) $iloscProcesow 0 &
for i in `seq 1 $((iloscProcesow-2))`
do
  ./$plik $(($i*$iloscPortowMiedzyProcesami+$portPoczatkowy)) 127.0.0.1 $((($i+1)*$iloscPortowMiedzyProcesami+$portPoczatkowy)) $iloscProcesow $i &
done
./$plik $((($iloscProcesow-1)*$iloscPortowMiedzyProcesami+$portPoczatkowy)) 127.0.0.1 $portPoczatkowy $iloscProcesow $(($iloscProcesow-1)) &

Kod programu (usuń 'sleep’ żeby pkt. nie stracić 🙂 ):

/*
Mamy N procesów jednowątkowych uruchomionych w klastrze komputerów – na różnych węzłach. Procesy zorganizowane są w listę cykliczną. Między procesami „wędruje” token. Proces P, który ma token może się rozmnożyć jak chce, ale nie musi. Jak się rozmnoży to następny proces jest wstawiany do listy przed procesem P, a token wędruje sobie dalej. Jak lista ma 2*N procesów wówczas wszystkie procesy się kończą
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>

#include <fcntl.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>

char wlasneIP[] = "127.0.0.1";
struct sockaddr_in adresDlaSocketuOczekujacegoNaPolaczenia, adresDlaSocketuDoWysylania, adresDlaSocketuKlienta;
char errorBuffer[300];
int socketOczekujacyNaPolaczenia, socketDoWysylania, socketKlienta;
int wielkoscPakietu;
int iloscProcesowToken;

int portDoNasluchiwania;
char* targetIP;
int portDoKtoregoMaWysylac;
int iloscProcesowNaStarcie;
int startowyNumerProcesu;

void handle_error(const char* msg)
{
	fprintf(stderr, "%s\n", msg);
	perror(NULL);
	close(socketOczekujacyNaPolaczenia);
	close(socketDoWysylania);
	close(socketKlienta);
	exit(EXIT_FAILURE);
}

void utworzSocketNasluchujacyNaPorcie(int port)
{
	socketOczekujacyNaPolaczenia = socket(AF_INET, SOCK_STREAM, 0);
	if(socketOczekujacyNaPolaczenia == -1)
	{
		handle_error("Nie udalo sie utworzyc socketu nasluchujacego.");
	}

	int optval = 1;
	setsockopt(socketOczekujacyNaPolaczenia, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);

	bzero((char *) &adresDlaSocketuOczekujacegoNaPolaczenia, sizeof(adresDlaSocketuOczekujacegoNaPolaczenia));
	adresDlaSocketuOczekujacegoNaPolaczenia.sin_family = AF_INET;
	adresDlaSocketuOczekujacegoNaPolaczenia.sin_addr.s_addr = INADDR_ANY;
	adresDlaSocketuOczekujacegoNaPolaczenia.sin_port = htons(port);
	if(bind(socketOczekujacyNaPolaczenia, (struct sockaddr *) &adresDlaSocketuOczekujacegoNaPolaczenia, sizeof(adresDlaSocketuOczekujacegoNaPolaczenia)) == -1)
	{
		sprintf(errorBuffer, "Nie udalo sie powiazac socketu nasluchujacego z INADDR_ANY na porcie %d.", port);
		handle_error(errorBuffer);
	}

 	if(listen(socketOczekujacyNaPolaczenia, 2) == 1)
	{
		handle_error("Nie udalo sie rozpoczac nasluchiwania na sockecie.");
	}
}

void utworzPolaczenieDoWysylaniaTokenu(char* ip, int port)
{
	socketDoWysylania = socket(AF_INET, SOCK_STREAM, 0);
	if(socketDoWysylania == -1)
	{
		handle_error("Nie udalo sie utworzyc socketu wysylajacego.");
	}

	bzero((char *) &adresDlaSocketuDoWysylania, sizeof(adresDlaSocketuDoWysylania));
	adresDlaSocketuDoWysylania.sin_family = AF_INET;
	adresDlaSocketuDoWysylania.sin_addr.s_addr = inet_addr(ip);
	adresDlaSocketuDoWysylania.sin_port = htons(port);
	while(1)
	{
		if(connect(socketDoWysylania, (struct sockaddr *) &adresDlaSocketuDoWysylania, sizeof(adresDlaSocketuDoWysylania)) == -1)
		{
			if(errno != ECONNREFUSED)
			{
				sprintf(errorBuffer, "Nie udalo sie polaczyc z socketu wysylajacego do IP %s do portu %d.", ip, port);
				handle_error(errorBuffer);
			}
			else
			{
				fprintf(stderr, "Proba polaczenia z socketu wysylajacego do IP %s do portu %d nie powiodla sie, polaczenie odrzucone. Czekam chwile.\n", ip, port);
			}
			// 0.1 sec miedzy probami kolejnych polaczen
			usleep(100000);
		}
		else
		{
			break;
		}
	}
	fprintf(stderr, "Polaczono socket wysylajacy do IP %s na port %d z listenera %d.\n", ip, port, portDoNasluchiwania);
}

void czekajNaPolaczenie()
{
	wielkoscPakietu = sizeof(struct sockaddr_in);
	socketKlienta = accept(socketOczekujacyNaPolaczenia,(struct sockaddr *) &adresDlaSocketuKlienta, (socklen_t*) &wielkoscPakietu);
	if(socketKlienta == -1)
	{
		handle_error("Akceptowanie polaczenia nie powiodlo sie.");
	}
	fprintf(stderr, "Klient z IP %s i portu %d\n", inet_ntoa(adresDlaSocketuKlienta.sin_addr), ntohs(adresDlaSocketuKlienta.sin_port));
}

void wyslijToken()
{
	int wyslaneBajty = 0;
	int intDoNeta = htonl(iloscProcesowToken);
	if((wyslaneBajty = send(socketDoWysylania, (const char*) &intDoNeta, 4, 0)) == -1 || wyslaneBajty != 4)
	{
		handle_error("Wysylanie inta sie nie powiodlo.");
	}
fprintf(stderr, "Wyslano token %d\n", iloscProcesowToken);
}

void odbierzToken()
{
	int odebraneBajty = 0;
	if((odebraneBajty = recv(socketKlienta, (void*) &iloscProcesowToken, 4, 0)) == -1 || odebraneBajty != 4)
	{
		handle_error("Odbieranie inta sie nie powiodlo.");
	}
	iloscProcesowToken = ntohl(iloscProcesowToken);
fprintf(stderr, "Odebrano token %d\n", iloscProcesowToken);
}

int main(int argc, char *argv[])
{
	if(argc != 6)
	{
		handle_error("Program wymaga 5 parametrow.");
	}
	portDoNasluchiwania = atoi(argv[1]);
	targetIP = argv[2];
	portDoKtoregoMaWysylac = atoi(argv[3]);
	iloscProcesowNaStarcie = atoi(argv[4]);
	startowyNumerProcesu = atoi(argv[5]);

	iloscProcesowToken = iloscProcesowNaStarcie;
	int startowyPortDoNasluchiwania = portDoNasluchiwania;

	// utworz socket oczekujacy na polaczenia
	utworzSocketNasluchujacyNaPorcie(portDoNasluchiwania);
	if(startowyNumerProcesu == 0) // pierwszy proces
	{
		// polacz sie z celem
		utworzPolaczenieDoWysylaniaTokenu(targetIP, portDoKtoregoMaWysylac);
		// czekaj na polaczenie
		czekajNaPolaczenie();
		// wyslij do celu token (ilosc procesow)
		wyslijToken();
	}
	else // inny niz pierwszy proces
	{
		// czekaj na polaczenie
		czekajNaPolaczenie();
		// polacz sie z celem
		utworzPolaczenieDoWysylaniaTokenu(targetIP, portDoKtoregoMaWysylac);
	}
	while(1)
	{
		usleep(100000);
		// odbierz liczbe
		odbierzToken();
		if(iloscProcesowToken >= iloscProcesowNaStarcie * 2)
		{
			// osiagnieto limit procesow, wyslij nastepnemu na liscie informacje o tym i zakoncz program
			usleep(100000);
			wyslijToken();
			break;
		}
		if(rand() % 2 == 0)
		{
			// dodaj 1 proces do listy przed aktualnym procesem
			int wolnyPortDoKomunikacjiMiedzyDzieckiemRodzicem = startowyPortDoNasluchiwania + iloscProcesowToken;
			pid_t czyToDziecko = fork();
			if(czyToDziecko == 0) // dziecko
			{
				portDoKtoregoMaWysylac = wolnyPortDoKomunikacjiMiedzyDzieckiemRodzicem;
				// polacz sie z rodzicem
				utworzPolaczenieDoWysylaniaTokenu(wlasneIP, portDoKtoregoMaWysylac);
				usleep(100000);
			}
			else // rodzic
			{
				portDoNasluchiwania = wolnyPortDoKomunikacjiMiedzyDzieckiemRodzicem;
				// utworz socket oczekujacy na polaczenie od dziecka
				utworzSocketNasluchujacyNaPorcie(portDoNasluchiwania);
				usleep(100000);
				// czekaj na polaczenie od dziecka
				czekajNaPolaczenie();
				usleep(100000);
				// wprowadz nowa ilosc procesow na liscie
				iloscProcesowToken++;
				// wyslij token (liczbe procesow)
				wyslijToken();
				usleep(100000);
			}
		}
		else
		{
			// wyslij token dalej
			wyslijToken();
			usleep(100000);
		}
	}
	// zamykanie wszystkich socketow danego procesu
	close(socketKlienta);
	close(socketOczekujacyNaPolaczenia);
	close(socketDoWysylania);
	// komunikat na koniec
	fprintf(stderr, "Zamknieto sockety. ID startowe: %d, suma procesow: %d, nasluchiwal na porcie: %d, wysylal do: %d\n", startowyNumerProcesu, iloscProcesowToken, portDoNasluchiwania, portDoKtoregoMaWysylac);
	return 1;
}

Niestety innych zadań trzecich nie mam. Sam 2 pierwsze zadania obroniłem na tyle punktów, że już trzeciego nie musiałem pisać.

Ten wpis został opublikowany w kategorii PJWSTK - ZSO. Dodaj zakładkę do bezpośredniego odnośnika.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *