Never been to DZone Snippets before?

Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

About this user

bob bae http://sparebandwidth.blogspot.com

« Newer Snippets
Older Snippets »
Showing 1-1 of 1 total  RSS 

Windows socket demo program -- uses threads and stuff -- for stress testing the TCP stack

// I wrote this to find out if the claims made at http://laurentszyster.be/blog/tcp-stack-flaking-out/ is
// valid. I disagree with the opinions of the person who posted that TCP flakes out on Windows. I think
// his program has obvious bugs. This program proves that as long as you write your code properly
// the TCP connections do not break randomly. Similar bugs existed in Bittorrent apparently, and they fixed them.
// For more information visit http://sparebandwidth.blogspot.com/2006/08/more-on-tcp-flaking-out.html

   1  
   2  #define BUFFER_SIZE 1000
   3  #define SERVER_ADDR "127.0.0.1"
   4  #define SERVER_PORT 9999
   5  #define NUM_THREADS 1000
   6  #define SEND_SOCK_BUFFER_SIZE 34000
   7  #define RECV_SOCK_BUFFER_SIZE  34000
   8  #define SERVER_RECV_SOCK_BUFFER_SIZE  34000
   9  #define SERVER_SEND_SOCK_BUFFER_SIZE 34000
  10  #define CLIENT_LIMIT 1000	/* how many times to send/recv in client */
  11  
  12  #include <stdio.h>     
  13  #define FD_SETSIZE  NUM_THREADS
  14  #include <winsock.h>   
  15  #include <stdlib.h>    
  16  
  17  #define fatal_error(arg) {\
  18  		fprintf(stderr,"total in %d out %d, %s: %d\n",		\
  19  			total_bytes_received, total_bytes_sent,		\
  20  			arg, WSAGetLastError()); fflush(stderr); exit(1); }
  21  #define fatal_error2(arg)  { fprintf(stderr, "%s", arg); fflush(stderr); exit(1);}
  22  
  23  int threads_all_done = 0;
  24  
  25  struct thread_args {
  26  	int id;
  27  	int server_sock;
  28  	struct sockaddr_in server_addr;
  29  };
  30  
  31  void *client_thread(void *args)
  32  {
  33  	struct thread_args *client_thread_arg;
  34  	struct sockaddr_in server_addr;
  35  	int server_sock;
  36  	int client_sock;
  37  	int on = 1;
  38  	int bytes_received, bytes_sent;
  39  	int num_to_read;
  40  	int total_bytes_received, total_bytes_sent;
  41  	char buffer[BUFFER_SIZE];
  42  	int size;
  43  	int i;
  44  
  45  	total_bytes_received = total_bytes_sent = 0;
  46  	
  47  	client_thread_arg = (struct thread_args *)args;
  48  	memcpy(&server_addr, &client_thread_arg->server_addr,
  49  	       sizeof(struct sockaddr_in));
  50  	server_sock = client_thread_arg->server_sock;
  51  
  52  	if ((client_sock = socket(PF_INET, SOCK_STREAM,
  53  				  IPPROTO_TCP)) < 0)
  54  		fatal_error("socket() error in client thread");
  55  
  56  	if (connect(client_sock, (struct sockaddr *)&server_addr,
  57  		    sizeof(server_addr)) < 0)
  58  		fatal_error("connect() error in client thread");
  59  
  60  	/* wait for the server to send startup message */
  61  	if ((bytes_received =
  62  	     recv(client_sock, buffer, BUFFER_SIZE, 0)) < 0)
  63  		fatal_error("recv() error in client thread");
  64  
  65  	if (ioctlsocket(client_sock, FIONBIO, &on) < 0)
  66  		fatal_error("FIONBIO error in client thread");
  67  
  68  	size = SEND_SOCK_BUFFER_SIZE;
  69  	if (setsockopt(client_sock, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0)
  70  		fatal_error("SO_SNDBUF error in client thread");
  71  	size = RECV_SOCK_BUFFER_SIZE;
  72  	if (setsockopt(client_sock, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) < 0)
  73  		fatal_error("SO_RCVBUF error in client thread");
  74  	
  75  	for (i = 0; i < CLIENT_LIMIT ; i++) {
  76  		int errno;
  77  		
  78  		if ((bytes_sent =
  79  		     send(client_sock, buffer,
  80  			  BUFFER_SIZE, 0)) != BUFFER_SIZE) {
  81  			errno = WSAGetLastError();
  82  			if (errno == WSAEWOULDBLOCK ||
  83  				errno == WSAENOBUFS)
  84  				continue;
  85  			
  86  			fatal_error("send() error in client thread");
  87  		}
  88  		total_bytes_sent += bytes_sent;
  89  
  90  		num_to_read = 0;
  91  		if (ioctlsocket(client_sock, FIONREAD, &num_to_read) < 0) 
  92  			fatal_error("FIONREAD error in client thread");
  93  		
  94  		if (num_to_read == 0)
  95  			continue;
  96  		
  97  		if ((bytes_received =
  98  		     recv(client_sock, buffer, 
  99  			  num_to_read >= BUFFER_SIZE ?
 100  			  BUFFER_SIZE - 1 : num_to_read, 0)) <= 0)
 101  			fatal_error("recv() error in client thread");
 102  		total_bytes_received += bytes_received; 		
 103  	}
 104  
 105  	printf("thread %d total bytes received %d sent %d\n",
 106  	       client_thread_arg->id, total_bytes_received, total_bytes_sent);
 107  
 108  #if 0
 109  	closesocket(client_sock);
 110  #endif
 111  	free(args);
 112  }
 113  
 114  void *starter_thread(void *args)
 115  {
 116  	HANDLE thread_handles[NUM_THREADS];
 117  	DWORD thread_ids[NUM_THREADS];
 118  	struct thread_args *client_thread_arg;
 119  	int i;
 120  	
 121  	for (i = 0; i < NUM_THREADS; i++) {
 122  		if ((client_thread_arg = 
 123  		     (struct thread_args *)
 124  		     malloc(sizeof(struct thread_args))) == 0) 
 125  			fatal_error2("malloc client_thread_arg error");
 126  		
 127  		memcpy(client_thread_arg, args, sizeof(struct thread_args));
 128  		client_thread_arg->id = i;
 129  
 130  		if ((thread_handles[i] =
 131  		     CreateThread(0, 0,
 132  				  (LPTHREAD_START_ROUTINE) client_thread,
 133  				  client_thread_arg, 0,
 134  				  (LPDWORD)&thread_ids[i])) == 0) 
 135  			fatal_error2("CreateThread for clients error");
 136  
 137  		printf("created client thread %d\n", i); fflush(stdout);
 138  	}
 139  	free(args);
 140  	args = 0;
 141  
 142  	printf("%d threads created\n", NUM_THREADS); fflush(stdout);
 143  	
 144  	for (i = 0; i < NUM_THREADS; i++) 
 145  		WaitForSingleObject(thread_handles[i], INFINITE);
 146  	
 147  	printf("all threads terminated\n"); fflush(stdout);
 148  
 149  	threads_all_done = 1;
 150  	
 151  	ExitThread(0);
 152  }
 153  
 154  int main(void)
 155  {
 156  	int server_sock, client_sock;    
 157  	struct sockaddr_in server_addr, client_addr; 
 158  	int client_addr_len;
 159  	unsigned short server_port;   
 160  	char buffer[BUFFER_SIZE];    
 161  	int bytes_received, total_bytes_received; 
 162  	int bytes_sent, total_bytes_sent;
 163  	int on = 1;
 164  	unsigned long num_to_read;
 165  	struct timeval timeout;
 166  	struct thread_args *starter_thread_arg;
 167  	WSADATA wsa_data;     
 168  	HANDLE starter_thread_handle;
 169  	DWORD starter_thread_id;
 170  	int client_sockets[NUM_THREADS];
 171  	fd_set socket_fds;
 172  	int size;
 173  	int i;
 174  
 175  	total_bytes_received = total_bytes_sent = 0;
 176  	
 177  	if (WSAStartup(MAKEWORD(2, 0), &wsa_data) != 0) {
 178  		fprintf(stderr, "WSAStartup() error");
 179  		exit(1);
 180  	}
 181  	
 182  	total_bytes_received = 0;
 183  	total_bytes_sent = 0;
 184  	
 185  	if ((server_sock = socket(PF_INET, SOCK_STREAM,
 186  				  IPPROTO_TCP)) < 0)
 187  		fatal_error("socket() error");
 188  	
 189  	memset(&client_addr, 0, sizeof(client_addr));     
 190  	memset(&server_addr, 0, sizeof(server_addr));     
 191  	server_addr.sin_family  = AF_INET;       
 192  	server_addr.sin_addr.s_addr = htonl(INADDR_ANY); 
 193  	server_addr.sin_port = htons(SERVER_PORT);
 194  	
 195  	if (bind(server_sock, (struct sockaddr *) &server_addr,
 196  		 sizeof(server_addr)) < 0)
 197  		fatal_error ("bind() error ");
 198  	if (listen(server_sock, 5) < 0)
 199  		fatal_error("listen() error");
 200  	
 201  	if ((starter_thread_arg =
 202  	     (struct thread_args *)
 203  	     malloc(sizeof(struct thread_args))) == 0)
 204  		fatal_error("malloc starter_thread_arg error");
 205  
 206  	starter_thread_arg->server_sock = server_sock;
 207  	server_addr.sin_addr.s_addr = inet_addr(SERVER_ADDR);
 208  	memcpy(&starter_thread_arg->server_addr , &server_addr,
 209  	       sizeof(server_addr));
 210  	starter_thread_arg->id = -1;
 211  	
 212  	if ((starter_thread_handle =
 213  	     CreateThread(0,0,
 214  			  (LPTHREAD_START_ROUTINE)starter_thread,
 215  			  starter_thread_arg, 0,
 216  			  (LPDWORD)&starter_thread_id)) == 0)
 217  		fatal_error("CreateThread for starter thread error");
 218  	
 219  	for (i = 0; i < NUM_THREADS; i++) {
 220  		client_addr_len = sizeof(client_addr);
 221  		if ((client_sock = accept(server_sock,
 222  					  (struct sockaddr *) &client_addr,
 223  					  &client_addr_len)) < 0)
 224  			fatal_error("accept() error");
 225  #if 0
 226  		printf("connection accepted from %s\n",
 227  		       inet_ntoa(client_addr.sin_addr));
 228  #endif
 229  		client_sockets[i] = client_sock;
 230  	}
 231  	printf("%d clients accepted\n", NUM_THREADS);
 232  
 233  	/* send startup message to all clients */
 234  	for (i = 0; i < NUM_THREADS; i++) {
 235  		int startup_msg_len ;
 236  
 237  		client_sock = client_sockets[i];
 238  		memset(buffer, 0, sizeof(buffer));
 239  		strcpy(buffer, "startup");
 240  		startup_msg_len = strlen(buffer);
 241  		if ((bytes_sent =
 242  		     send(client_sock, buffer,
 243  			  startup_msg_len, 0)) != startup_msg_len)
 244  			fatal_error("sending start signal failed");
 245  
 246  		size = SERVER_SEND_SOCK_BUFFER_SIZE;
 247  		if (setsockopt(client_sock, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0)
 248  			fatal_error("SO_SNDBUF error  ");
 249  		size = SERVER_RECV_SOCK_BUFFER_SIZE;
 250  		if (setsockopt(client_sock, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) < 0)
 251  			fatal_error("SO_RCVBUF error ");
 252  	
 253  		if (ioctlsocket(client_sock, FIONBIO, &on) < 0)
 254  			fatal_error("FIONBIO error");
 255  	}
 256  
 257  	printf("sent startup messages\n"); fflush(stdout);
 258  	for (;;)  {
 259  		int half = BUFFER_SIZE / 2;
 260  
 261  		if (threads_all_done) break;
 262  
 263  		Sleep(100);	/* slow down server artificially */
 264  		
 265  		FD_ZERO(&socket_fds);
 266  		for (i = 0; i < NUM_THREADS; i++)
 267  			FD_SET(client_sockets[i], &socket_fds);
 268  		timeout.tv_sec = 0;
 269  		timeout.tv_usec = 10000;
 270  
 271  		if (select(0, &socket_fds, 0, 0, &timeout) == 0)
 272  			continue;
 273  
 274  		for (i = 0; i < NUM_THREADS; i++) {
 275  			int errno;
 276  			
 277  			if (! FD_ISSET(client_sockets[i], &socket_fds))
 278  				continue;
 279  
 280  			client_sock = client_sockets[i];
 281  
 282  			/* send half as much */
 283  			if ((bytes_sent =
 284  			     send(client_sock, buffer,
 285  				 half, 0)) != half) {
 286  				errno = WSAGetLastError();
 287  				if (errno == WSAEWOULDBLOCK ||
 288  				    errno == WSAENOBUFS)
 289  					continue;
 290  				fatal_error("send() error");
 291  			}
 292  			total_bytes_sent += bytes_sent;
 293  			
 294  			num_to_read = 0;
 295  			if (ioctlsocket(client_sock, FIONREAD, &num_to_read) < 0) 
 296  				fatal_error("FIONREAD error");
 297  			
 298  			if (num_to_read == 0)
 299  				continue;
 300  			
 301  			/* read half as much */
 302  			num_to_read = num_to_read >= half ? half - 1 : num_to_read;
 303  			if ((bytes_received =
 304  			     recv(client_sock, buffer, num_to_read, 0)) <= 0)
 305  				fatal_error("recv() error ");
 306  			total_bytes_received += bytes_received;
 307  		}
 308  	}
 309  	
 310  	closesocket(server_sock);
 311  	closesocket(client_sock);
 312  	WSACleanup();
 313  	WaitForSingleObject(starter_thread_handle, INFINITE);
 314  
 315  	printf("all threads finished. total bytes received %d sent %d\n",
 316  	       total_bytes_received, total_bytes_sent);
 317  	
 318  	exit(0);
 319  }
 320  
« Newer Snippets
Older Snippets »
Showing 1-1 of 1 total  RSS