Program - 8
Aim: To Implement 2-phase Commit Client-Server
Code:
Server Side: -
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#define TRUE 1
#define FALSE 0
#define MPROC 20
#define MAX_BUFF 512
#define PORT 5001
typedef struct sockaddr SA;
typedef struct sockaddr_in SA_IN;
void verify(int num, char * quitMessage) {
if (num < 0) {
perror(quitMessage);
exit(EXIT_FAILURE);
}
}
int connect_to_port(int port_num){
int sock_id; int opt = 1;
SA_IN server_addr;
verify(sock_id = socket(AF_INET, SOCK_DGRAM, 0), "Unable to create socket");
setsockopt(sock_id, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt,
sizeof(int));
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port_num);
verify(bind(sock_id, (const SA * ) & server_addr, sizeof(server_addr)),
"Unable to bind to port");
return sock_id;
}
void send_to_id(int port_num, int id, char * message){
SA_IN cl;
memset(&cl, 0, sizeof(cl));
cl.sin_family = AF_INET;
cl.sin_addr.s_addr = INADDR_ANY;
cl.sin_port = htons(port_num);
sendto(id, (const char *)message, strlen(message), MSG_CONFIRM, (const
SA*)&cl, sizeof(cl));
}
void begin_commit(int id, int *procs, int num_procs){
char message[MAX_BUFF];
sprintf(message, "%s", "SCMT");
for (int itr = 0; itr < num_procs; itr++){
printf("Sending begin commit to: %d\n", procs[itr]);
send_to_id(procs[itr], id, message);
}
}
void announce_action(int self, int *procs, int num_procs, char msg[MAX_BUFF]){
for (int itr = 0; itr < num_procs; itr++){
send_to_id(procs[itr], self, msg);
}
}
int main(int argc, char* argv[]){
int self = atoi(argv[1]);
int n_procs = atoi(argv[2]);
int procs[MPROC];
int sender, okcnt = 0, nocnt = 0, dncnt = 0;
int sock_id, coord_id;
int itr, len, n, start, ix;
char buffer[MAX_BUFF], flag[MAX_BUFF], p_id[MAX_BUFF], msg[MAX_BUFF];
struct sockaddr_in from;
for(itr = 0; itr < n_procs; itr += 1){
procs[itr] = atoi(argv[3 + itr]);
}
printf("Creating node at %d\n", self);
sock_id = connect_to_port(self);
begin_commit(sock_id, procs, n_procs);
while(TRUE){
sleep(2);
memset(&from, 0, sizeof(from));
n = recvfrom(sock_id, (char *)buffer, MAX_BUFF, MSG_WAITALL, (SA
*)&from, &len);
buffer[n] = '\0';
printf("Recieved: %s\n", buffer);
if (strcmp(buffer, "CMOK") == 0){
okcnt += 1;
}
else if (strcmp(buffer, "CMNO") == 0){
nocnt += 1;
}
if ((nocnt + okcnt) == n_procs){
printf("Recieved replies from all clients\n");
if (okcnt == n_procs)
{
printf("Announcing complete commit\n");
announce_action(sock_id, procs, n_procs, "CDON");
}
else
{
printf("Announcing abort commit\n");
announce_action(sock_id, procs, n_procs, "CABT");
}
}
if (strcmp(buffer, "DONE") == 0){
dncnt += 1;
printf("clients confirmed commit\n");
if (dncnt == n_procs)
{
printf("All process announced commit action\n");
exit(EXIT_SUCCESS);
}
}
}
return 0;
}
Client Side: -
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#define TRUE 1
#define FALSE 0
#define MAX_BUFF 512
typedef struct sockaddr SA;
typedef struct sockaddr_in SA_IN;
void verify(int num, char * quitMessage) {
if (num < 0) {
perror(quitMessage);
exit(EXIT_FAILURE);
}
}
int connect_to_port(int port_num){
int sock_id; int opt = 1;
SA_IN server_addr;
verify(sock_id = socket(AF_INET, SOCK_DGRAM, 0), "Unable to create socket");
setsockopt(sock_id, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt,
sizeof(int));
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port_num);
verify(bind(sock_id, (const SA * ) & server_addr, sizeof(server_addr)),
"Unable to bind to port");
return sock_id;
}
void send_to_id(int port_num, int id, char * message){
SA_IN cl;
memset(&cl, 0, sizeof(cl));
cl.sin_family = AF_INET;
cl.sin_addr.s_addr = INADDR_ANY;
cl.sin_port = htons(port_num);
sendto(id, (const char *)message, strlen(message), MSG_CONFIRM, (const
SA*)&cl, sizeof(cl));
}
int main(int argc, char* argv[]){
int self = atoi(argv[1]);
int server = atoi(argv[2]);
char *action = argv[3];
int sender, okcnt = 0, nocnt = 0, dncnt = 0;
int sock_id, coord_id;
int itr, len, n, start, ix;
char buffer[MAX_BUFF], flag[MAX_BUFF], p_id[MAX_BUFF], msg[MAX_BUFF];
struct sockaddr_in from;
printf("Creating node at %d\n", self);
sock_id = connect_to_port(self);
while(TRUE){
sleep(2);
memset(&from, 0, sizeof(from));
n = recvfrom(sock_id, (char *)buffer, MAX_BUFF, MSG_WAITALL, (struct
sockaddr *)&from, &len);
buffer[n] = '\0';
printf("Recieved: %s\n", buffer);
if (strcmp(buffer, "SCMT") == 0){
printf("Sending %s to server\n", action);
send_to_id(server, sock_id, action);
}
else if (strcmp(buffer, "CDON") == 0){
printf("Got complete commit, committing to logs\n");
send_to_id(server, sock_id, "DONE");
exit(EXIT_SUCCESS);
}
else if (strcmp(buffer, "CABT") == 0){
printf("Got abort commit, deleting updates\n");
send_to_id(server, sock_id, "DONE");
exit(EXIT_FAILURE);
}
}
return 0;
}
Output:
Coordinator: -
Client 1: -
Client 2: -
Client 3: -
Client 4: -