From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from svn.comics.unina.it (unknown [143.225.229.147]) by huchra.bufferbloat.net (Postfix) with ESMTP id CD01820045C for ; Tue, 24 May 2011 07:52:27 -0700 (PDT) Received: from www-data by svn.comics.unina.it with local (Exim 4.69) (envelope-from ) id 1QOtFq-0005Y7-FD for bismark-commits@lists.bufferbloat.net; Tue, 24 May 2011 17:10:42 +0200 To: bismark-commits@lists.bufferbloat.net From: walter@svn.comics.unina.it Message-Id: Date: Tue, 24 May 2011 17:10:42 +0200 Subject: [Bismark-commits] rev 343 - trunk/server/src X-BeenThere: bismark-commits@lists.bufferbloat.net X-Mailman-Version: 2.1.13 Precedence: list List-Id: Commit log for the bismark source code List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 24 May 2011 14:52:28 -0000 Author: walter Date: 2011-05-24 17:10:42 +0200 (Tue, 24 May 2011) New Revision: 343 Modified: trunk/server/src/bdmd.c Log: multiport bdmd version Modified: trunk/server/src/bdmd.c =================================================================== --- trunk/server/src/bdmd.c 2011-05-23 12:29:36 UTC (rev 342) +++ trunk/server/src/bdmd.c 2011-05-24 15:10:42 UTC (rev 343) @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -28,8 +29,18 @@ typedef struct { struct sockaddr_in cad; /* Client-side socket info */ char *payload; /* Received payload */ + unsigned int ssd_idx; /* Listening socket index */ } thp; +/* Config */ +typedef struct { + char *var_dir; + char *msr_db; + char *bdm_db; + char *msg_db; + char *log_dir; +} config_t; + /* Probe format */ typedef struct { char *id; /* Device id */ @@ -65,12 +76,13 @@ /* * Constants */ -#define BDM_DB "/home/bismark/var/db/bdm.db" -#define MSG_DB "/home/bismark/var/db/msg.db" -#define MSR_DB "/home/bismark/var/db/msr.db" -#define LOG_DIR "/home/bismark/var/log/devices" +#define BDM_DB "db/bdm.db" +#define MSG_DB "db/msg.db" +#define MSR_DB "db/msr.db" +#define LOG_DIR "log/devices" -#define MAX_NUM_THREAD 50 +#define MAX_NUM_THREAD 250 +#define MAX_PORTS 10 #define MAX_UDP_PSIZE 1472 #define MAX_QUERY_LEN 1000 #define MAX_IP_LEN 16 @@ -79,12 +91,15 @@ #define MAX_WAIT_LEN 5 #define MAX_FILENAME_LEN 50 +#define RCV_BUFF_SIZE 300000 + /* * Globals */ int num_thread; /* Threads counter */ struct sockaddr_in sad; /* Server-side socket info */ -int ssd; /* Server socket descriptor */ +int ssd[MAX_PORTS]; /* Server socket descriptors */ +config_t config; /* Config variables */ /* Mutex */ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; @@ -204,7 +219,7 @@ fflush(stdout); #endif /* Open bdm db */ - if (sqlite3_open(BDM_DB, &bdm_db)) { + if (sqlite3_open(config.bdm_db, &bdm_db)) { fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(bdm_db)); sqlite3_close(bdm_db); free(tp); @@ -231,7 +246,7 @@ } /* Check messages */ - sqlite3_open(MSG_DB, &msg_db); + sqlite3_open(config.msg_db, &msg_db); snprintf(query, MAX_QUERY_LEN, "SELECT rowid,* FROM messages WHERE \"to\"='%s' LIMIT 1;", probe.id); if ((row = do_query(msg_db, query, 1))) { mf msg; /* Message row fields */ @@ -275,13 +290,13 @@ char *log = &tp->payload[i + 1]; /* Log data in current packet */ /* Append log to logfile */ - snprintf(logfile, MAX_FILENAME_LEN, "%s/%s.log", LOG_DIR, probe.id); + snprintf(logfile, MAX_FILENAME_LEN, "%s/%s.log", config.log_dir, probe.id); lfp = fopen(logfile, "a"); fprintf(lfp, "%s - %s\n%s\nEND - %s\n", date, probe.param, log, probe.param); fclose(lfp); /* Send message to bdm client */ - sqlite3_open(MSG_DB, &msg_db); + sqlite3_open(config.msg_db, &msg_db); snprintf(query, MAX_QUERY_LEN, "INSERT INTO messages ('from', 'to', msg) VALUES('%s','BDM','%s');", probe.id, probe.param); do_query(msg_db, query, 0); sqlite3_close(msg_db); @@ -306,7 +321,7 @@ probe.param[i] = 0; /* Query target (prefer target with less clients and closest free timestamp) */ - sqlite3_open(MSR_DB, &msr_db); + sqlite3_open(config.msr_db, &msr_db); snprintf(query, MAX_QUERY_LEN, "SELECT t.ip,info,free_ts,curr_cli,max_cli FROM targets AS t, capabilities AS c " "WHERE t.ip=c.ip AND service='%s' AND cat='%s' AND zone='%s' ORDER BY curr_cli,free_ts ASC LIMIT 1;", request.type, request.cat, request.zone); @@ -383,7 +398,7 @@ /* Send reply to client */ if (reply) { - sendto(ssd, reply, strlen(reply), 0, (struct sockaddr*) &tp->cad, sizeof(tp->cad)); + sendto(ssd[tp->ssd_idx], reply, strlen(reply), 0, (struct sockaddr*) &tp->cad, sizeof(tp->cad)); /* Post delivery actions */ if (!strncmp(reply, "fwd", 3)) { @@ -414,87 +429,162 @@ */ int main(int argc, char *argv[]) { - unsigned short server_port; /* Server port number */ - pthread_t hThr[MAX_NUM_THREAD]; /* Threads handlers */ - time_t ts; /* Current timestamp */ - char date[25]; /* Date string */ - int i = 0; + unsigned short server_ports[MAX_PORTS]; /* Server port numbers */ + unsigned short num_ports = 0; /* Number of listening ports */ + int max_sd = 0; /* Biggest socket descriptor */ + fd_set lset; /* Socket descriptors listening set */ + fd_set rset; /* Socket descriptors returned by select */ + pthread_t hThr; /* Thread handler */ + time_t ts; /* Current timestamp */ + char date[25]; /* Date string */ + unsigned int rcv_buff_size = RCV_BUFF_SIZE; + int i = 0, j = 0; /* Command-line check */ - if (argc != 2) { - fprintf(stderr, "usage: %s \n", argv[0]); + if (argc < 2) { + fprintf(stderr, "usage: %s [ [port] ... ]\n", argv[0]); return 0; } - /* Port number parsing */ - server_port = atoi(argv[1]); - if (server_port < 0 || server_port > 65534) { - fprintf(stderr, "invalid port number: %s\n", argv[1]); - return 1; + /* Get port numbers */ + for (i=1; i 65534) { + fprintf(stderr, "warning: invalid port %u\n", server_ports[j]); + continue; + } + if (server_ports[j] < 1024 && getuid() > 0) { + fprintf(stderr, "warning: skipping port %u - root priviledges needed\n", server_ports[j]); + continue; + } + /* Server-side socket info initialization */ + memset((char *) &sad, 0, sizeof(sad)); + sad.sin_family = AF_INET; + sad.sin_addr.s_addr = htonl(INADDR_ANY); + sad.sin_port = htons(server_ports[j]); + + /* Socket creation */ + if ((ssd[j] = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + fprintf(stderr, "warning: unable to create socket %u\n", j); + continue; + } + + /* Set the receive buffer length */ + setsockopt(ssd[j], SOL_SOCKET, SO_RCVBUF, &rcv_buff_size, sizeof(rcv_buff_size)); + + /* Socket binding */ + if (bind(ssd[j], (struct sockaddr *) &sad, sizeof(sad)) < 0) { + fprintf(stderr, "warning: bind fallito\n"); + continue; + } + + /* Add socket to the set */ + if (ssd[j] > max_sd) + max_sd = ssd[j]; + FD_SET(ssd[j], &lset); + + /* Output log entry */ + printf("%u ", server_ports[j]); + fflush(stdout); + } + + /* Output log entry */ + printf("\n"); + fflush(stdout); + /* Infinite loop */ while (1) { char payload[MAX_UDP_PSIZE]; /* Temporary buffer to store payload */ struct sockaddr_in cad; /* Temporary Client-side socket info */ socklen_t cadlen; /* Client-side socket info length */ thp *ntp; /* New thread parameters */ + struct timeval time_out; /* Select timeout value */ + int ready_sds; /* Number of detected events */ int bytes; - /* Listen for probe packets */ - cadlen = sizeof(struct sockaddr_in); - bytes = recvfrom(ssd, payload, MAX_UDP_PSIZE, 0, (struct sockaddr*) &cad, &cadlen); - if (bytes <= 0) { - fprintf(stderr, "error receiving probe packet\n"); - continue; - } + /* Update select parameters */ + rset = lset; + time_out.tv_sec = 5; + time_out.tv_usec = 0; - /* Prepare thread parameters */ - ntp = malloc(sizeof(thp)); - ntp->cad = cad; - ntp->payload = calloc(1, bytes + 1); - strncpy(ntp->payload, payload, bytes); + /* Wait for events on the socket descriptors set */ + if ((ready_sds = select(max_sd + 1, &rset, NULL, NULL, &time_out)) < 0) { + perror("warning: select failed"); + continue; + } - /* Check max thread number */ - if (num_thread >= MAX_NUM_THREAD) { - fprintf(stderr, "max connections reached: refusing connection from %s\n", (char *) inet_ntoa(cad.sin_addr)); - free(ntp); - } else { - /* Create thread to handle the connection */ - if (pthread_create(&hThr[i], NULL, doit, ntp) < 0) { - fprintf(stderr, "error creating thread\n"); - } else { - pthread_detach(hThr[i]); + /* Timeout expired */ + if (ready_sds == 0) + continue; + + /* Check all sockets for events */ + for (j=0; jcad = cad; + ntp->payload = calloc(1, bytes + 1); + strncpy(ntp->payload, payload, bytes); + ntp->ssd_idx = j; + + /* Check max thread number */ + if (num_thread >= MAX_NUM_THREAD) { + char reply[MAX_QUERY_LEN]; + + sprintf(reply, "pong %s %lu\n", inet_ntoa(cad.sin_addr), time(NULL)); + sendto(ssd[j], reply, strlen(reply), 0, (struct sockaddr*) &cad, sizeof(cad)); + fprintf(stderr, "max threads reached: quick reply to %s\n", (char *) inet_ntoa(cad.sin_addr)); + free(ntp); + } else { + /* Create thread to handle the connection */ + if (pthread_create(&hThr, NULL, doit, ntp) < 0) { + fprintf(stderr, "warning: unable to create thread\n"); + } else { + pthread_detach(hThr); + } + } } } } /* Close socket */ - close(ssd); + for (j=0; j