Historic archive of defunct list bismark-commits@lists.bufferbloat.net
 help / color / mirror / Atom feed
* [Bismark-commits] rev 343 - trunk/server/src
@ 2011-05-24 15:10 walter
  0 siblings, 0 replies; only message in thread
From: walter @ 2011-05-24 15:10 UTC (permalink / raw)
  To: bismark-commits

Author: walter
Date: 2011-05-24 17:10:42 +0200 (Tue, 24 May 2011)
New Revision: 343

Modified:
   trunk/server/src/bdmd.c
Log:
multiport bdmd version

Modified: trunk/server/src/bdmd.c
===================================================================
--- trunk/server/src/bdmd.c	2011-05-23 12:29:36 UTC (rev 342)
+++ trunk/server/src/bdmd.c	2011-05-24 15:10:42 UTC (rev 343)
@@ -18,6 +18,7 @@
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <sys/types.h>
 #include <pthread.h>
 #include <sqlite3.h>
 
@@ -28,8 +29,18 @@
 typedef struct {
 	struct sockaddr_in cad; /* Client-side socket info */
 	char *payload; 		/* Received payload */
+	unsigned int ssd_idx;	/* Listening socket index */
 } thp;
 
+/* Config */
+typedef struct {
+	char *var_dir;
+	char *msr_db;
+	char *bdm_db;
+	char *msg_db;
+	char *log_dir;
+} config_t;
+
 /* Probe format */
 typedef struct {
 	char *id; 	/* Device id */
@@ -65,12 +76,13 @@
 /*
  * Constants
  */
-#define BDM_DB "/home/bismark/var/db/bdm.db"
-#define MSG_DB "/home/bismark/var/db/msg.db"
-#define MSR_DB "/home/bismark/var/db/msr.db"
-#define LOG_DIR "/home/bismark/var/log/devices"
+#define BDM_DB "db/bdm.db"
+#define MSG_DB "db/msg.db"
+#define MSR_DB "db/msr.db"
+#define LOG_DIR "log/devices"
 
-#define MAX_NUM_THREAD 50
+#define MAX_NUM_THREAD 250
+#define MAX_PORTS 10
 #define MAX_UDP_PSIZE 1472
 #define MAX_QUERY_LEN 1000
 #define MAX_IP_LEN 16
@@ -79,12 +91,15 @@
 #define MAX_WAIT_LEN 5
 #define MAX_FILENAME_LEN 50
 
+#define RCV_BUFF_SIZE 300000
+
 /*
  * Globals
  */
 int num_thread; 	/* Threads counter */
 struct sockaddr_in sad; /* Server-side socket info */
-int ssd; 		/* Server socket descriptor */
+int ssd[MAX_PORTS]; 	/* Server socket descriptors */
+config_t config;	/* Config variables */
 
 /* Mutex */
 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -204,7 +219,7 @@
 	fflush(stdout);
 #endif
 	/* Open bdm db */
-	if (sqlite3_open(BDM_DB, &bdm_db)) {
+	if (sqlite3_open(config.bdm_db, &bdm_db)) {
 		fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(bdm_db));
 		sqlite3_close(bdm_db);
 		free(tp);
@@ -231,7 +246,7 @@
 		}
 
 		/* Check messages */
-		sqlite3_open(MSG_DB, &msg_db);
+		sqlite3_open(config.msg_db, &msg_db);
 		snprintf(query, MAX_QUERY_LEN, "SELECT rowid,* FROM messages WHERE \"to\"='%s' LIMIT 1;", probe.id);
 		if ((row = do_query(msg_db, query, 1))) {
 			mf msg; 	/* Message row fields */
@@ -275,13 +290,13 @@
 		char *log = &tp->payload[i + 1];	/* Log data in current packet */
 
 		/* Append log to logfile */
-		snprintf(logfile, MAX_FILENAME_LEN, "%s/%s.log", LOG_DIR, probe.id);
+		snprintf(logfile, MAX_FILENAME_LEN, "%s/%s.log", config.log_dir, probe.id);
 		lfp = fopen(logfile, "a");
 		fprintf(lfp, "%s - %s\n%s\nEND - %s\n", date, probe.param, log, probe.param);
 		fclose(lfp);
 
 		/* Send message to bdm client */
-		sqlite3_open(MSG_DB, &msg_db);
+		sqlite3_open(config.msg_db, &msg_db);
 		snprintf(query, MAX_QUERY_LEN, "INSERT INTO messages ('from', 'to', msg) VALUES('%s','BDM','%s');", probe.id, probe.param);
 		do_query(msg_db, query, 0);
 		sqlite3_close(msg_db);
@@ -306,7 +321,7 @@
 		probe.param[i] = 0;
 
 		/* Query target (prefer target with less clients and closest free timestamp) */
-		sqlite3_open(MSR_DB, &msr_db);
+		sqlite3_open(config.msr_db, &msr_db);
 		snprintf(query, MAX_QUERY_LEN, "SELECT t.ip,info,free_ts,curr_cli,max_cli FROM targets AS t, capabilities AS c "
 					       "WHERE t.ip=c.ip AND service='%s' AND cat='%s' AND zone='%s' ORDER BY curr_cli,free_ts ASC LIMIT 1;",
 					       request.type, request.cat, request.zone);
@@ -383,7 +398,7 @@
 
 	/* Send reply to client */
 	if (reply) {
-		sendto(ssd, reply, strlen(reply), 0, (struct sockaddr*) &tp->cad, sizeof(tp->cad));
+		sendto(ssd[tp->ssd_idx], reply, strlen(reply), 0, (struct sockaddr*) &tp->cad, sizeof(tp->cad));
 
 		/* Post delivery actions */
 		if (!strncmp(reply, "fwd", 3)) {
@@ -414,87 +429,162 @@
  */
 int main(int argc, char *argv[])
 {
-	unsigned short server_port; 	/* Server port number */
-	pthread_t hThr[MAX_NUM_THREAD]; /* Threads handlers */
-	time_t ts; 			/* Current timestamp */
-	char date[25]; 			/* Date string */
-	int i = 0;
+	unsigned short server_ports[MAX_PORTS]; /* Server port numbers */
+	unsigned short num_ports = 0;		/* Number of listening ports */
+        int max_sd = 0;              		/* Biggest socket descriptor */
+        fd_set lset;          			/* Socket descriptors listening set */
+        fd_set rset;            		/* Socket descriptors returned by select */
+	pthread_t hThr; 			/* Thread handler */
+	time_t ts; 				/* Current timestamp */
+	char date[25]; 				/* Date string */
+	unsigned int rcv_buff_size = RCV_BUFF_SIZE;
+	int i = 0, j = 0;
 
 	/* Command-line check */
-	if (argc != 2) {
-		fprintf(stderr, "usage: %s <port>\n", argv[0]);
+	if (argc < 2) {
+		fprintf(stderr, "usage: %s <port> [ [port] ... ]\n", argv[0]);
 		return 0;
 	}
 
-	/* Port number parsing */
-	server_port = atoi(argv[1]);
-	if (server_port < 0 || server_port > 65534) {
-		fprintf(stderr, "invalid port number: %s\n", argv[1]);
-		return 1;
+	/* Get port numbers */
+	for (i=1; i<argc; i++) {
+		server_ports[i-1] = atoi(argv[i]);
+		num_ports++;
 	}
 
-	/* Server-side socket info initialization */
-	memset((char *) &sad, 0, sizeof(sad));
-	sad.sin_family = AF_INET;
-	sad.sin_addr.s_addr = htonl(INADDR_ANY);
-	sad.sin_port = htons(server_port);
+	/* Set config variables */
+	config.var_dir = getenv("VAR_DIR");
+	if (!config.var_dir) config.var_dir = strdup("/var");
+	config.msr_db = malloc(strlen(config.var_dir) + sizeof(MSR_DB) + 3);
+	sprintf(config.msr_db, "%s/%s", config.var_dir, MSR_DB);
+	config.msg_db = malloc(strlen(config.var_dir) + sizeof(MSG_DB) + 3);
+	sprintf(config.msg_db, "%s/%s", config.var_dir, MSG_DB);
+	config.bdm_db = malloc(strlen(config.var_dir) + sizeof(BDM_DB) + 3);
+	sprintf(config.bdm_db, "%s/%s", config.var_dir, BDM_DB);
+	config.log_dir = malloc(strlen(config.var_dir) + sizeof(LOG_DIR) + 3);
+	sprintf(config.log_dir, "%s/%s", config.var_dir, LOG_DIR);
 
-	/* Socket creation */
-	if ((ssd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
-		fprintf(stderr, "error creating socket\n");
-		return 1;
-	}
-
-	/* Socket binding */
-	if (bind(ssd, (struct sockaddr *) &sad, sizeof(sad)) < 0) {
-		fprintf(stderr, "bind fallito\n");
-		return 1;
-	}
-
 	/* Get timestamp */
 	ts = time(NULL);
 	strftime(date, sizeof(date), "%Y/%m/%d %H:%M:%S", localtime(&ts));
 
 	/* Output log entry */
-	printf("%s - Listening to probe packets on port %d\n", date, server_port);
+	printf("%s - Listening to probe packets on ports: ", date);
 	fflush(stdout);
+	
+	/* Prepare set of listening sockets */
+        FD_ZERO(&lset);
+	for (j=0; j<num_ports; j++) {
+		/* Check port number */
+		if (server_ports[j] < 0 || server_ports[j] > 65534) {
+			fprintf(stderr, "warning: invalid port %u\n", server_ports[j]);
+			continue;
+		}
+		if (server_ports[j] < 1024 && getuid() > 0) {
+			fprintf(stderr, "warning: skipping port %u - root priviledges needed\n", server_ports[j]);
+			continue;
+		}
 
+		/* Server-side socket info initialization */
+		memset((char *) &sad, 0, sizeof(sad));
+		sad.sin_family = AF_INET;
+		sad.sin_addr.s_addr = htonl(INADDR_ANY);
+		sad.sin_port = htons(server_ports[j]);
+
+		/* Socket creation */
+		if ((ssd[j] = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+			fprintf(stderr, "warning: unable to create socket %u\n", j);
+			continue;
+		}
+
+		/* Set the receive buffer length */
+		setsockopt(ssd[j], SOL_SOCKET, SO_RCVBUF, &rcv_buff_size, sizeof(rcv_buff_size));
+
+		/* Socket binding */
+		if (bind(ssd[j], (struct sockaddr *) &sad, sizeof(sad)) < 0) {
+			fprintf(stderr, "warning: bind fallito\n");
+			continue;
+		}
+		
+		/* Add socket to the set */
+		if (ssd[j] > max_sd) 
+			max_sd = ssd[j];
+		FD_SET(ssd[j], &lset);
+
+		/* Output log entry */
+		printf("%u ", server_ports[j]);
+		fflush(stdout);
+	}
+
+	/* Output log entry */
+	printf("\n");
+	fflush(stdout);
+
 	/* Infinite loop */
 	while (1) {
 		char payload[MAX_UDP_PSIZE];	/* Temporary buffer to store payload */
 		struct sockaddr_in cad;		/* Temporary Client-side socket info */
 		socklen_t cadlen;		/* Client-side socket info length */
 		thp *ntp;			/* New thread parameters */
+		struct timeval time_out;	/* Select timeout value */
+		int ready_sds;			/* Number of detected events */
 		int bytes;
 
-		/* Listen for probe packets */
-		cadlen = sizeof(struct sockaddr_in);
-		bytes = recvfrom(ssd, payload, MAX_UDP_PSIZE, 0, (struct sockaddr*) &cad, &cadlen);
-		if (bytes <= 0) {
-			fprintf(stderr, "error receiving probe packet\n");
-			continue;
-		}
+                /* Update select parameters */
+                rset = lset;
+                time_out.tv_sec = 5;
+                time_out.tv_usec = 0;
 
-		/* Prepare thread parameters */
-		ntp = malloc(sizeof(thp));
-		ntp->cad = cad;
-		ntp->payload = calloc(1, bytes + 1);
-		strncpy(ntp->payload, payload, bytes);
+		/* Wait for events on the socket descriptors set */
+                if ((ready_sds = select(max_sd + 1, &rset, NULL, NULL, &time_out)) < 0) {
+                        perror("warning: select failed");
+                        continue;
+                }
 
-		/* Check max thread number */
-		if (num_thread >= MAX_NUM_THREAD) {
-			fprintf(stderr, "max connections reached: refusing connection from %s\n", (char *) inet_ntoa(cad.sin_addr));
-			free(ntp);
-		} else {
-			/* Create thread to handle the connection */
-			if (pthread_create(&hThr[i], NULL, doit, ntp) < 0) {
-				fprintf(stderr, "error creating thread\n");
-			} else {
-				pthread_detach(hThr[i]);
+                /* Timeout expired */
+                if (ready_sds == 0)
+                        continue;
+
+		/* Check all sockets for events */
+		for (j=0; j<num_ports; j++) {
+	                if (FD_ISSET(ssd[j], &rset)) {
+				/* Read packet from socket */
+				cadlen = sizeof(struct sockaddr_in);
+				bytes = recvfrom(ssd[j], payload, MAX_UDP_PSIZE, 0, (struct sockaddr*) &cad, &cadlen);
+				if (bytes <= 0) {
+					fprintf(stderr, "error receiving probe packet\n");
+					continue;
+				}
+
+				/* Prepare thread parameters */
+				ntp = malloc(sizeof(thp));
+				ntp->cad = cad;
+				ntp->payload = calloc(1, bytes + 1);
+				strncpy(ntp->payload, payload, bytes);
+				ntp->ssd_idx = j;
+
+				/* Check max thread number */
+				if (num_thread >= MAX_NUM_THREAD) {
+					char reply[MAX_QUERY_LEN];
+					
+					sprintf(reply, "pong %s %lu\n", inet_ntoa(cad.sin_addr), time(NULL));
+					sendto(ssd[j], reply, strlen(reply), 0, (struct sockaddr*) &cad, sizeof(cad));
+					fprintf(stderr, "max threads reached: quick reply to %s\n", (char *) inet_ntoa(cad.sin_addr));
+					free(ntp);
+				} else {
+					/* Create thread to handle the connection */
+					if (pthread_create(&hThr, NULL, doit, ntp) < 0) {
+						fprintf(stderr, "warning: unable to create thread\n");
+					} else {
+						pthread_detach(hThr);
+					}
+				}
 			}
 		}
 	}
 
 	/* Close socket */
-	close(ssd);
+	for (j=0; j<num_ports; j++) {
+		close(ssd[j]);
+	}
 }


^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2011-05-24 14:52 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2011-05-24 15:10 [Bismark-commits] rev 343 - trunk/server/src walter

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox