/* * This file is part of QCluster. * * QCluster is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * QCluster is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with QCluster; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Peter Harper * * $Id: qcluster.c,v 1.1 2003/12/02 06:17:38 rmello Exp $ */ static char rcsid[] = "@(#) $Id: qcluster.c,v 1.1 2003/12/02 06:17:38 rmello Exp $"; #include "ns.h" #include "cluster.h" #include "clustercomms.h" #include "message.h" #include "locks.h" int Ns_ModuleVersion = 1; Ns_Mutex g_recover_mutex; Ns_Mutex g_send_mutex; /* * Static procedures. */ static Ns_SockProc q_AcceptProc; static void q_ConnectThread(void *arg); static void q_ServerThread(void *arg); static int q_compare_server_names(const void *, const void *); static void q_shutdown(void); /* *---------------------------------------------------------------------- * * Ns_ModuleInit -- * * Load the config parameters, setup the structures, and * listen on the control port. * * Results: * None. * * Side effects: * Server will listen for control connections on specified * address and port. * *---------------------------------------------------------------------- */ NS_EXPORT int Ns_ModuleInit(char *server, char *module) { char tmp_ip_str[Q_MAX_NAME_SIZE]; char tmp_name_str[Q_MAX_NAME_SIZE]; int tmp_port; char *path; char *pass; char *key; char *val; char *group_name_ptr; int i; int count; int status_count; int group_id; int new_flag; SOCKET lsock; Ns_Set *set; Tcl_HashEntry *entry_ptr; Tcl_HashEntry *search_ptr; Tcl_HashTable groups; Tcl_HashSearch search; struct q_server* server_ptr; struct q_group* group_ptr; Ns_Log(Notice, "qcluster: initialising"); Ns_MutexInit(&g_send_mutex); /* * Initialise the shared memory (locks) module and any global locks * and hash tables. */ Tcl_InitHashTable(&g_msg_types, TCL_ONE_WORD_KEYS); Tcl_InitHashTable(&g_cluster, TCL_STRING_KEYS); Ns_RWLockInit(&g_msg_types_mutex); Ns_RWLockInit(&g_cluster_mutex); Ns_MutexInit(&g_recover_mutex); q_init_lock(); /* * Read the default values for local address and port for the * listener socket. Note that these values may be overridden by * an entry for this server in the cluster entries that specifies * a specific port. */ path = Ns_ConfigGetPath(server, module, NULL); if ( ((g_addr = Ns_ConfigGet(path, "address")) == NULL) || (!Ns_ConfigGetInt(path, "port", &g_port)) || ((g_iam = Ns_ConfigGet(path, "iam")) == NULL) ) { Ns_Log(Error, "qcluster: address must be specified in config"); return NS_ERROR; } /* * Read the groups from the config file. */ Tcl_InitHashTable(&groups, TCL_STRING_KEYS); path = Ns_ConfigGetPath(server, module, "groups", NULL); set = Ns_ConfigGetSection(path); group_id = 0; for (i = 0; set != NULL && i < Ns_SetSize(set); ++i) { pass = NULL; key = Ns_SetKey(set, i); group_name_ptr = Ns_SetValue(set, i); if (!STRIEQ(key, "group")) { Ns_Log(Error, "qcluster: invalid groups key: %s", key); continue; } else { /* * Add the group entry to the hash table. * lock for the structure in the locks table. */ entry_ptr = Tcl_CreateHashEntry(&groups, group_name_ptr, &new_flag); Tcl_SetHashValue(entry_ptr, (void *) group_id); group_id++; } /* End "if key = group" else. */ } if (groups.numEntries == 0) { Ns_Log(Warning, "qcluster: No groups specified"); } /* * Read the list of cluster members. Initialise the hash table for * each member found. */ g_server = NULL; path = Ns_ConfigGetPath(server, module, "cluster", NULL); set = Ns_ConfigGetSection(path); g_cluster_size = 0; for (i = 0; set != NULL && i < Ns_SetSize(set); ++i) { pass = NULL; key = Ns_SetKey(set, i); val = Ns_SetValue(set, i); if (!STRIEQ(key, "member")) { Ns_Log(Error, "qcluster: invalid cluster key: %s", key); continue; } else { count = sscanf(val, "%[^:]:%[^:]:%d", tmp_ip_str, tmp_name_str, &tmp_port); if (count == 0) { Ns_Log(Error, "qcluster: invalid cluster value: %s", key); Ns_Log(Error, "qcluster: ipaddress[:name][:port]"); continue; } /* * Create structure for this host. */ server_ptr = ns_malloc(sizeof(struct q_server)); strcpy(server_ptr->svr_ip, tmp_ip_str); Ns_MutexInit(&server_ptr->c_mutex); Ns_MutexInit(&server_ptr->s_mutex); server_ptr->c_sock = -1; server_ptr->s_sock = -1; /* * Add this server to the array of servers. */ g_cluster_order[g_cluster_size] = server_ptr; g_cluster_size++; /* * If a server name wasn't given for this entry, just use the * textual version of the IP address. Otherwise, copy in the * given name. * * If a server port wasn't given for this entry, use the default * port as specified in the main qcluster section (read above). */ if (count > 1) { strcpy(server_ptr->svr_name, tmp_name_str); } else { strcpy(server_ptr->svr_name, tmp_ip_str); } if (count > 2) { server_ptr->l_port = tmp_port; } else { server_ptr->l_port = g_port; } /* * Check whether the server name matches this one. If so, * set the flag to indicate that this server structure corresponds * to us. Also, reset the listening port for this instance (just * in case the specified port in this server's entry is different to * the default). */ if (!strcmp(server_ptr->svr_name, g_iam)) { server_ptr->iam_p = 1; g_port = server_ptr->l_port; g_server = server_ptr; } else { server_ptr->iam_p = 0; } /* * Loop through each configured group name and initialise the * groups hash table for this server. */ Tcl_InitHashTable(&server_ptr->groups_ht, TCL_STRING_KEYS); search_ptr = Tcl_FirstHashEntry(&groups, &search); while (search_ptr != NULL) { group_name_ptr = Tcl_GetHashKey(&groups, search_ptr); /* * Create the new group structure, initialise its values, and * add it to the servers groups hash table. */ group_ptr = ns_malloc(sizeof(struct q_group)); strcpy(group_ptr->grp_name, group_name_ptr); group_ptr->grp_id = (int) Tcl_GetHashValue(search_ptr); group_ptr->can_process_p = 0; Ns_CondInit(&group_ptr->event); Ns_MutexInit(&group_ptr->event_lock); Tcl_InitHashTable(&group_ptr->queue_ht, TCL_STRING_KEYS); group_ptr->queue_ll = NULL; for (status_count = 0; status_count < Q_MSG_STATUS_MAX; status_count++) { group_ptr->queue_by_stat_ll[status_count] = NULL; } entry_ptr = Tcl_CreateHashEntry(&server_ptr->groups_ht, group_name_ptr, &new_flag); Tcl_SetHashValue(entry_ptr, group_ptr); q_add_lock(group_ptr); search_ptr = Tcl_NextHashEntry(&search); } /* * Add the new server entry into the hash table, and create the * lock for the structure in the locks table. */ entry_ptr = Tcl_CreateHashEntry(&g_cluster, server_ptr->svr_name, &new_flag); Tcl_SetHashValue(entry_ptr, server_ptr); q_add_lock(server_ptr); Ns_Log(Notice, "qcluster: added host: %s", server_ptr->svr_name); } /* End "if key = member" else. */ } if (g_cluster.numEntries == 0) { Ns_Log(Warning, "qcluster: No cluster members"); } /* * Check that the iam server entry existed in the cluster list. */ if (g_server == NULL) { Ns_Log(Error, "qcluster: iam server name must exist in the cluster list. Please check you're configuration."); return NS_ERROR; } /* * Sort the server array such that we know who our nearest neighbour is. */ qsort(g_cluster_order, g_cluster_size, sizeof(struct q_server *), q_compare_server_names); /* * Work out where we are in the cluster list and set the g_this_server_pos * global variable. */ for (i = 0; i < g_cluster_size; i++) { if (g_cluster_order[i] == g_server) { g_this_server_pos = i; } } /* * Start the listening socket. */ lsock = Ns_SockListen(g_addr, g_port); if (lsock == INVALID_SOCKET) { Ns_Log(Error, "qcluster: could not listen on %s:%d", g_addr, g_port); return NS_ERROR; } Ns_Log(Notice, "qcluster: listening on %s:%d", g_addr, g_port); // Don't know what this does!? // Ns_RegisterProcInfo(AcceptProc, "qcluster", ArgProc); Ns_SockCallback(lsock, q_AcceptProc, g_server, NS_SOCK_READ|NS_SOCK_EXIT); /* * Loop through each server entry, and assuming the entry doesn't * correspond to this server, start a thread to attempt and manage * the connection to the remote host. */ Ns_RWLockRdLock(&g_cluster_mutex); search_ptr = Tcl_FirstHashEntry(&g_cluster, &search); while (search_ptr != NULL) { server_ptr = Tcl_GetHashValue(search_ptr); q_get_rlock(server_ptr); if (!server_ptr->iam_p) { Ns_ThreadCreate(q_ConnectThread, (void *) server_ptr, 0, NULL); } q_release_lock(server_ptr); search_ptr = Tcl_NextHashEntry(&search); } Ns_RWLockUnlock(&g_cluster_mutex); g_qcluster_started = 1; return NS_OK; } static int q_compare_server_names(const void *ptr1, const void *ptr2) { return strcmp((*(struct q_server **)ptr1)->svr_name, (*(struct q_server **)ptr2)->svr_name); } int server_recv(int sock, void *msg_buf, int msg_size, int flags) { int read_len; int read_val; read_len = 0; read_val = 0; while (read_len < msg_size) { if ((read_val = recv(sock, msg_buf + read_len, msg_size - read_len, flags)) <= 0) { read_len = -1; break; } else { read_len += read_val; } } return read_len; } /* *---------------------------------------------------------------------- * * q_AcceptProc -- * * Socket callback to accept a new connection. * * Results: * NS_TRUE to keep listening unless shutdown is in progress. * * Side effects: * New EvalThread will be created. * *---------------------------------------------------------------------- */ static int q_AcceptProc(SOCKET lsock, void *object_ptr, int why) { int n; int len; int s_sock; static int next; struct q_server *this_server_ptr; struct q_server *named_server_ptr; struct sockaddr sa; int msg_header[2]; int msg_id; int msg_size; char server_name[Q_MAX_NAME_SIZE]; Tcl_HashEntry *entry; this_server_ptr = (struct q_server *)object_ptr; if (why == NS_SOCK_EXIT) { Ns_Log(Notice, "qcluster: shutdown"); q_shutdown(); ns_sockclose(lsock); return NS_FALSE; } len = sizeof(sa); s_sock = Ns_SockAccept(lsock, &sa, &len); if (s_sock == INVALID_SOCKET) { Ns_Log(Error, "qcluster: accept() failed: %s, %d", ns_sockstrerror(ns_sockerrno), ns_sockerrno); } else { /* * Read header of identification message: * 4 octets: Message Id * 4 octets: Message Size (excluding header) */ if ((n = server_recv(s_sock, msg_header, sizeof(msg_header), 0)) <= 0) { Ns_Log(Error, "qcluster: header receive error: %s", ns_sockstrerror(ns_sockerrno)); ns_sockclose(s_sock); goto error_exit; } msg_id = ntohl(msg_header[0]); msg_size = ntohl(msg_header[1]); if (DEBUG) { Ns_Log(Notice, "qcluster: header received (%d, %d)", msg_id, msg_size); } /* * Check that we've received the correct header format. */ if (msg_id != Q_MSG_TYPE_ID) { Ns_Log(Error, "qcluster: Incorrect header message id: %d", msg_id); ns_sockclose(s_sock); goto error_exit; } /* * Receive the name of the remote connection. Remeber to terminate * the localy stored copy with a zero. */ if ((n = server_recv(s_sock, server_name, msg_size, 0)) <= 0) { Ns_Log(Error, "qcluster: server identifier message receive error: %s", ns_sockstrerror(ns_sockerrno)); ns_sockclose(s_sock); goto error_exit; } server_name[msg_size] = '\0'; /* * Try and find this server in the cluster hashtable. Once found, * Update the connection details and kick off a server thread to * service this connection. */ Ns_RWLockRdLock(&g_cluster_mutex); entry = Tcl_FindHashEntry(&g_cluster, server_name); Ns_RWLockUnlock(&g_cluster_mutex); if (entry == NULL) { Ns_Log(Error, "qcluster: server_name %s unknown", server_name); ns_sockclose(s_sock); goto error_exit; } named_server_ptr = Tcl_GetHashValue(entry); q_get_wlock(named_server_ptr); if (named_server_ptr->iam_p) { q_release_lock(named_server_ptr); Ns_Log(Error, "qcluster: Cannot accept connections from oneself!"); ns_sockclose(s_sock); goto error_exit; } named_server_ptr->s_sock = s_sock; memcpy(&named_server_ptr->s_sa, &sa, sizeof(sa)); q_release_lock(named_server_ptr); Ns_ThreadCreate(q_ServerThread, (void *) named_server_ptr, 0, NULL); } error_exit: return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_shutdown -- * * Results: * None. * *---------------------------------------------------------------------- */ static void q_shutdown(void) { int count; struct q_server *server_ptr; for (count = 0; count < g_cluster_size; count++) { server_ptr = g_cluster_order[count]; if (server_ptr != g_server) { q_get_rlock(server_ptr); ns_sockclose(server_ptr->c_sock); ns_sockclose(server_ptr->s_sock); q_release_lock(server_ptr); } } } /* *---------------------------------------------------------------------- * * q_ConnectThread -- * * Thread to connect to remote host. * * Results: * None. * *---------------------------------------------------------------------- */ static void q_ConnectThread(void *arg) { struct q_server *server_ptr; int port; char svr_name[Q_MAX_NAME_SIZE]; char svr_ip[Q_MAX_NAME_SIZE]; char this_svr_name[Q_MAX_NAME_SIZE]; int sock; int n; int msg_header[2]; int msg_id; int msg_size; char msg_buf[Q_MAX_MSG_SIZE]; Ns_Mutex *c_mutex; fd_set set; SOCKET sock_pipe[2]; int write_len; int max; unsigned char dummy_byte; server_ptr = (struct q_server *) arg; /* * Log the starting of this thread, and make thread local copies of the * remote server's IP address and listening port. */ q_get_rlock(server_ptr); Ns_Log(Notice, "qcluster: client connect thread starting for %s", server_ptr->svr_name); port = server_ptr->l_port; strcpy(svr_ip, server_ptr->svr_ip); strcpy(svr_name, server_ptr->svr_name); c_mutex = &server_ptr->c_mutex; q_release_lock(server_ptr); q_get_rlock(g_server); strcpy(this_svr_name, g_server->svr_name); q_release_lock(g_server); while (1) { // Ns_Log(Notice, "connect: Waiting for %s lock", server_ptr->svr_name); Ns_MutexLock(c_mutex); // Ns_Log(Notice, "connect: Got for %s lock", server_ptr->svr_name); sock = Ns_SockTimedConnect(svr_ip, port, 5); if (sock == INVALID_SOCKET) { Ns_Log(Notice, "qcluster: Connection to %s:%s:%d failed.", svr_ip, svr_name, port); Ns_MutexUnlock(c_mutex); sleep(1); continue; } /* * Use the connected socket. */ Ns_Log(Notice, "qcluster: Connection to %s:%s:%d succeeded. (%x->%d)", svr_ip, svr_name, port, server_ptr, sock); q_get_rlock(server_ptr); server_ptr->c_sock = sock; q_release_lock(server_ptr); /* * Send the header identification message. */ msg_header[0] = htonl(Q_MSG_TYPE_ID); msg_header[1] = htonl(strlen(this_svr_name)); send(sock, msg_header, sizeof(msg_header), 0); send(sock, this_svr_name, strlen(this_svr_name), 0); Ns_MutexUnlock(c_mutex); /* * Dump copies of our message queues to the newly attached host. */ q_comms_server_connect_dump_messages(server_ptr); /* * Loop whilst still connected. */ while (1) { if ((n = server_recv(sock, msg_header, sizeof(msg_header), 0)) <= 0) { Ns_Log(Error, "qcluster: header receive error: %s", ns_sockstrerror(ns_sockerrno)); ns_sockclose(sock); break; } msg_id = ntohl(msg_header[0]); msg_size = ntohl(msg_header[1]); if ((n = server_recv(sock, msg_buf, msg_size, 0)) <= 0) { Ns_Log(Error, "qcluster: message content receive error: %s", ns_sockstrerror(ns_sockerrno)); ns_sockclose(sock); break; } q_comms_process(server_ptr, msg_id, msg_buf, msg_size); } q_get_wlock(server_ptr); server_ptr->c_sock = -1; // ns_sockclose(server_ptr->s_sock); // server_ptr->s_sock = -1; q_release_lock(server_ptr); Ns_Log(Error, "qcluster: Connection to host %s terminated", svr_name); /* * Recover the server, getting the lock such that we only recover * one server at a time. This prevents any lock contentions during * the recover process. */ Ns_MutexLock(&g_recover_mutex); q_comms_recover_server(server_ptr); Ns_MutexUnlock(&g_recover_mutex); Ns_Log(Notice, "connect: Recovering complete for %s", server_ptr->svr_name); } } /* *---------------------------------------------------------------------- * * q_ServerThread -- * * Thread to handle incoming connection from remote host. * * Results: * None. * *---------------------------------------------------------------------- */ static void q_ServerThread(void *arg) { struct q_server *server_ptr; char svr_name[Q_MAX_NAME_SIZE]; char this_svr_name[Q_MAX_NAME_SIZE]; int sock; int n; int msg_header[2]; int msg_id; int msg_size; char msg_buf[Q_MAX_MSG_SIZE]; server_ptr = (struct q_server *) arg; /* * Log the starting of this thread, and make thread local copies of the * remote server's IP address and listening port. */ q_get_rlock(server_ptr); Ns_Log(Notice, "qcluster: server connect thread starting for %s", server_ptr->svr_name); strcpy(svr_name, server_ptr->svr_name); sock = server_ptr->s_sock; q_release_lock(server_ptr); q_get_rlock(g_server); strcpy(this_svr_name, g_server->svr_name); q_release_lock(g_server); while (1) { if ((n = server_recv(sock, msg_header, sizeof(msg_header), 0)) <= 0) { Ns_Log(Error, "qcluster: header receive error: %s", ns_sockstrerror(ns_sockerrno)); ns_sockclose(sock); break; } msg_id = ntohl(msg_header[0]); msg_size = ntohl(msg_header[1]); if ((n = server_recv(sock, msg_buf, msg_size, 0)) <= 0) { Ns_Log(Error, "qcluster: message content receive error: %s", ns_sockstrerror(ns_sockerrno)); ns_sockclose(sock); break; } q_comms_process(server_ptr, msg_id, msg_buf, msg_size); } q_get_wlock(server_ptr); server_ptr->s_sock = -1; // ns_sockclose(server_ptr->c_sock); // server_ptr->c_sock = -1; q_release_lock(server_ptr); Ns_Log(Error, "qcluster: Connection from host %s terminated", svr_name); Ns_ThreadExit (0); }