* [Bismark-commits] rev 343 - trunk/server/src
@ 2011-05-24 15:10 walter
0 siblings, 0 replies; only message in thread
From: walter @ 2011-05-24 15:10 UTC (permalink / raw)
To: bismark-commits
Author: walter
Date: 2011-05-24 17:10:42 +0200 (Tue, 24 May 2011)
New Revision: 343
Modified:
trunk/server/src/bdmd.c
Log:
multiport bdmd version
Modified: trunk/server/src/bdmd.c
===================================================================
--- trunk/server/src/bdmd.c 2011-05-23 12:29:36 UTC (rev 342)
+++ trunk/server/src/bdmd.c 2011-05-24 15:10:42 UTC (rev 343)
@@ -18,6 +18,7 @@
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
+#include <sys/types.h>
#include <pthread.h>
#include <sqlite3.h>
@@ -28,8 +29,18 @@
typedef struct {
struct sockaddr_in cad; /* Client-side socket info */
char *payload; /* Received payload */
+ unsigned int ssd_idx; /* Listening socket index */
} thp;
+/* Config */
+typedef struct {
+ char *var_dir;
+ char *msr_db;
+ char *bdm_db;
+ char *msg_db;
+ char *log_dir;
+} config_t;
+
/* Probe format */
typedef struct {
char *id; /* Device id */
@@ -65,12 +76,13 @@
/*
* Constants
*/
-#define BDM_DB "/home/bismark/var/db/bdm.db"
-#define MSG_DB "/home/bismark/var/db/msg.db"
-#define MSR_DB "/home/bismark/var/db/msr.db"
-#define LOG_DIR "/home/bismark/var/log/devices"
+#define BDM_DB "db/bdm.db"
+#define MSG_DB "db/msg.db"
+#define MSR_DB "db/msr.db"
+#define LOG_DIR "log/devices"
-#define MAX_NUM_THREAD 50
+#define MAX_NUM_THREAD 250
+#define MAX_PORTS 10
#define MAX_UDP_PSIZE 1472
#define MAX_QUERY_LEN 1000
#define MAX_IP_LEN 16
@@ -79,12 +91,15 @@
#define MAX_WAIT_LEN 5
#define MAX_FILENAME_LEN 50
+#define RCV_BUFF_SIZE 300000
+
/*
* Globals
*/
int num_thread; /* Threads counter */
struct sockaddr_in sad; /* Server-side socket info */
-int ssd; /* Server socket descriptor */
+int ssd[MAX_PORTS]; /* Server socket descriptors */
+config_t config; /* Config variables */
/* Mutex */
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -204,7 +219,7 @@
fflush(stdout);
#endif
/* Open bdm db */
- if (sqlite3_open(BDM_DB, &bdm_db)) {
+ if (sqlite3_open(config.bdm_db, &bdm_db)) {
fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(bdm_db));
sqlite3_close(bdm_db);
free(tp);
@@ -231,7 +246,7 @@
}
/* Check messages */
- sqlite3_open(MSG_DB, &msg_db);
+ sqlite3_open(config.msg_db, &msg_db);
snprintf(query, MAX_QUERY_LEN, "SELECT rowid,* FROM messages WHERE \"to\"='%s' LIMIT 1;", probe.id);
if ((row = do_query(msg_db, query, 1))) {
mf msg; /* Message row fields */
@@ -275,13 +290,13 @@
char *log = &tp->payload[i + 1]; /* Log data in current packet */
/* Append log to logfile */
- snprintf(logfile, MAX_FILENAME_LEN, "%s/%s.log", LOG_DIR, probe.id);
+ snprintf(logfile, MAX_FILENAME_LEN, "%s/%s.log", config.log_dir, probe.id);
lfp = fopen(logfile, "a");
fprintf(lfp, "%s - %s\n%s\nEND - %s\n", date, probe.param, log, probe.param);
fclose(lfp);
/* Send message to bdm client */
- sqlite3_open(MSG_DB, &msg_db);
+ sqlite3_open(config.msg_db, &msg_db);
snprintf(query, MAX_QUERY_LEN, "INSERT INTO messages ('from', 'to', msg) VALUES('%s','BDM','%s');", probe.id, probe.param);
do_query(msg_db, query, 0);
sqlite3_close(msg_db);
@@ -306,7 +321,7 @@
probe.param[i] = 0;
/* Query target (prefer target with less clients and closest free timestamp) */
- sqlite3_open(MSR_DB, &msr_db);
+ sqlite3_open(config.msr_db, &msr_db);
snprintf(query, MAX_QUERY_LEN, "SELECT t.ip,info,free_ts,curr_cli,max_cli FROM targets AS t, capabilities AS c "
"WHERE t.ip=c.ip AND service='%s' AND cat='%s' AND zone='%s' ORDER BY curr_cli,free_ts ASC LIMIT 1;",
request.type, request.cat, request.zone);
@@ -383,7 +398,7 @@
/* Send reply to client */
if (reply) {
- sendto(ssd, reply, strlen(reply), 0, (struct sockaddr*) &tp->cad, sizeof(tp->cad));
+ sendto(ssd[tp->ssd_idx], reply, strlen(reply), 0, (struct sockaddr*) &tp->cad, sizeof(tp->cad));
/* Post delivery actions */
if (!strncmp(reply, "fwd", 3)) {
@@ -414,87 +429,162 @@
*/
int main(int argc, char *argv[])
{
- unsigned short server_port; /* Server port number */
- pthread_t hThr[MAX_NUM_THREAD]; /* Threads handlers */
- time_t ts; /* Current timestamp */
- char date[25]; /* Date string */
- int i = 0;
+ unsigned short server_ports[MAX_PORTS]; /* Server port numbers */
+ unsigned short num_ports = 0; /* Number of listening ports */
+ int max_sd = 0; /* Biggest socket descriptor */
+ fd_set lset; /* Socket descriptors listening set */
+ fd_set rset; /* Socket descriptors returned by select */
+ pthread_t hThr; /* Thread handler */
+ time_t ts; /* Current timestamp */
+ char date[25]; /* Date string */
+ unsigned int rcv_buff_size = RCV_BUFF_SIZE;
+ int i = 0, j = 0;
/* Command-line check */
- if (argc != 2) {
- fprintf(stderr, "usage: %s <port>\n", argv[0]);
+ if (argc < 2) {
+ fprintf(stderr, "usage: %s <port> [ [port] ... ]\n", argv[0]);
return 0;
}
- /* Port number parsing */
- server_port = atoi(argv[1]);
- if (server_port < 0 || server_port > 65534) {
- fprintf(stderr, "invalid port number: %s\n", argv[1]);
- return 1;
+ /* Get port numbers */
+ for (i=1; i<argc; i++) {
+ server_ports[i-1] = atoi(argv[i]);
+ num_ports++;
}
- /* Server-side socket info initialization */
- memset((char *) &sad, 0, sizeof(sad));
- sad.sin_family = AF_INET;
- sad.sin_addr.s_addr = htonl(INADDR_ANY);
- sad.sin_port = htons(server_port);
+ /* Set config variables */
+ config.var_dir = getenv("VAR_DIR");
+ if (!config.var_dir) config.var_dir = strdup("/var");
+ config.msr_db = malloc(strlen(config.var_dir) + sizeof(MSR_DB) + 3);
+ sprintf(config.msr_db, "%s/%s", config.var_dir, MSR_DB);
+ config.msg_db = malloc(strlen(config.var_dir) + sizeof(MSG_DB) + 3);
+ sprintf(config.msg_db, "%s/%s", config.var_dir, MSG_DB);
+ config.bdm_db = malloc(strlen(config.var_dir) + sizeof(BDM_DB) + 3);
+ sprintf(config.bdm_db, "%s/%s", config.var_dir, BDM_DB);
+ config.log_dir = malloc(strlen(config.var_dir) + sizeof(LOG_DIR) + 3);
+ sprintf(config.log_dir, "%s/%s", config.var_dir, LOG_DIR);
- /* Socket creation */
- if ((ssd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
- fprintf(stderr, "error creating socket\n");
- return 1;
- }
-
- /* Socket binding */
- if (bind(ssd, (struct sockaddr *) &sad, sizeof(sad)) < 0) {
- fprintf(stderr, "bind fallito\n");
- return 1;
- }
-
/* Get timestamp */
ts = time(NULL);
strftime(date, sizeof(date), "%Y/%m/%d %H:%M:%S", localtime(&ts));
/* Output log entry */
- printf("%s - Listening to probe packets on port %d\n", date, server_port);
+ printf("%s - Listening to probe packets on ports: ", date);
fflush(stdout);
+
+ /* Prepare set of listening sockets */
+ FD_ZERO(&lset);
+ for (j=0; j<num_ports; j++) {
+ /* Check port number */
+ if (server_ports[j] < 0 || server_ports[j] > 65534) {
+ fprintf(stderr, "warning: invalid port %u\n", server_ports[j]);
+ continue;
+ }
+ if (server_ports[j] < 1024 && getuid() > 0) {
+ fprintf(stderr, "warning: skipping port %u - root priviledges needed\n", server_ports[j]);
+ continue;
+ }
+ /* Server-side socket info initialization */
+ memset((char *) &sad, 0, sizeof(sad));
+ sad.sin_family = AF_INET;
+ sad.sin_addr.s_addr = htonl(INADDR_ANY);
+ sad.sin_port = htons(server_ports[j]);
+
+ /* Socket creation */
+ if ((ssd[j] = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+ fprintf(stderr, "warning: unable to create socket %u\n", j);
+ continue;
+ }
+
+ /* Set the receive buffer length */
+ setsockopt(ssd[j], SOL_SOCKET, SO_RCVBUF, &rcv_buff_size, sizeof(rcv_buff_size));
+
+ /* Socket binding */
+ if (bind(ssd[j], (struct sockaddr *) &sad, sizeof(sad)) < 0) {
+ fprintf(stderr, "warning: bind fallito\n");
+ continue;
+ }
+
+ /* Add socket to the set */
+ if (ssd[j] > max_sd)
+ max_sd = ssd[j];
+ FD_SET(ssd[j], &lset);
+
+ /* Output log entry */
+ printf("%u ", server_ports[j]);
+ fflush(stdout);
+ }
+
+ /* Output log entry */
+ printf("\n");
+ fflush(stdout);
+
/* Infinite loop */
while (1) {
char payload[MAX_UDP_PSIZE]; /* Temporary buffer to store payload */
struct sockaddr_in cad; /* Temporary Client-side socket info */
socklen_t cadlen; /* Client-side socket info length */
thp *ntp; /* New thread parameters */
+ struct timeval time_out; /* Select timeout value */
+ int ready_sds; /* Number of detected events */
int bytes;
- /* Listen for probe packets */
- cadlen = sizeof(struct sockaddr_in);
- bytes = recvfrom(ssd, payload, MAX_UDP_PSIZE, 0, (struct sockaddr*) &cad, &cadlen);
- if (bytes <= 0) {
- fprintf(stderr, "error receiving probe packet\n");
- continue;
- }
+ /* Update select parameters */
+ rset = lset;
+ time_out.tv_sec = 5;
+ time_out.tv_usec = 0;
- /* Prepare thread parameters */
- ntp = malloc(sizeof(thp));
- ntp->cad = cad;
- ntp->payload = calloc(1, bytes + 1);
- strncpy(ntp->payload, payload, bytes);
+ /* Wait for events on the socket descriptors set */
+ if ((ready_sds = select(max_sd + 1, &rset, NULL, NULL, &time_out)) < 0) {
+ perror("warning: select failed");
+ continue;
+ }
- /* Check max thread number */
- if (num_thread >= MAX_NUM_THREAD) {
- fprintf(stderr, "max connections reached: refusing connection from %s\n", (char *) inet_ntoa(cad.sin_addr));
- free(ntp);
- } else {
- /* Create thread to handle the connection */
- if (pthread_create(&hThr[i], NULL, doit, ntp) < 0) {
- fprintf(stderr, "error creating thread\n");
- } else {
- pthread_detach(hThr[i]);
+ /* Timeout expired */
+ if (ready_sds == 0)
+ continue;
+
+ /* Check all sockets for events */
+ for (j=0; j<num_ports; j++) {
+ if (FD_ISSET(ssd[j], &rset)) {
+ /* Read packet from socket */
+ cadlen = sizeof(struct sockaddr_in);
+ bytes = recvfrom(ssd[j], payload, MAX_UDP_PSIZE, 0, (struct sockaddr*) &cad, &cadlen);
+ if (bytes <= 0) {
+ fprintf(stderr, "error receiving probe packet\n");
+ continue;
+ }
+
+ /* Prepare thread parameters */
+ ntp = malloc(sizeof(thp));
+ ntp->cad = cad;
+ ntp->payload = calloc(1, bytes + 1);
+ strncpy(ntp->payload, payload, bytes);
+ ntp->ssd_idx = j;
+
+ /* Check max thread number */
+ if (num_thread >= MAX_NUM_THREAD) {
+ char reply[MAX_QUERY_LEN];
+
+ sprintf(reply, "pong %s %lu\n", inet_ntoa(cad.sin_addr), time(NULL));
+ sendto(ssd[j], reply, strlen(reply), 0, (struct sockaddr*) &cad, sizeof(cad));
+ fprintf(stderr, "max threads reached: quick reply to %s\n", (char *) inet_ntoa(cad.sin_addr));
+ free(ntp);
+ } else {
+ /* Create thread to handle the connection */
+ if (pthread_create(&hThr, NULL, doit, ntp) < 0) {
+ fprintf(stderr, "warning: unable to create thread\n");
+ } else {
+ pthread_detach(hThr);
+ }
+ }
}
}
}
/* Close socket */
- close(ssd);
+ for (j=0; j<num_ports; j++) {
+ close(ssd[j]);
+ }
}
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2011-05-24 14:52 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2011-05-24 15:10 [Bismark-commits] rev 343 - trunk/server/src walter
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox