/* * This file is part of QCluster. * * QCluster is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * QCluster is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with QCluster; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Peter Harper * * $Id: clustercomms.c,v 1.1 2003/12/02 06:17:38 rmello Exp $ * * ------------------------------------------------------------------------ * * Message processing support for inter-qcluster server messages. */ static char rcsid[] = "@(#) $Id: clustercomms.c,v 1.1 2003/12/02 06:17:38 rmello Exp $"; #include "ns.h" #include "cluster.h" #include "message.h" #include "locks.h" #include "clustercomms.h" int server_send(struct q_server *server_ptr, const void *msg_buf, int msg_size) { int write_len; int write_val; write_len = 0; write_val = 0; Ns_MutexLock(&server_ptr->c_mutex); while (write_len < msg_size) { if ((write_val = send(server_ptr->c_sock, msg_buf + write_len, msg_size - write_len,0)) <= 0) { write_len = -1; break; } else { write_len += write_val; } } Ns_MutexUnlock(&server_ptr->c_mutex); return write_len; } #define Q_ALIGNED_SIZE(s) (s + (sizeof(int) - (s % sizeof(int)))) static void q_comms_handle_msg_add(struct q_server *server_ptr, char *msg_buf, int msg_size); static void q_comms_handle_msg_delegate(struct q_server *server_ptr, char *msg_buf, int msg_size); static void q_comms_handle_status_update_msg(struct q_server *server_ptr, char *msg_buf, int msg_size); static void q_comms_handle_can_process_msg(struct q_server *server_ptr, char *msg_buf, int msg_size); static void q_comms_handle_delete_recovered_msg(struct q_server *server_ptr, char *msg_buf, int msg_size); static void q_comms_build_msg_add_msg(unsigned char *msg_buf, int *msg_size_ptr, struct q_message *msg_ptr); static void q_comms_build_can_process_msg(unsigned char *msg_buf, int *msg_size_ptr, struct q_group *group_ptr); static void q_comms_build_delegate_msg(unsigned char *msg_buf, int *msg_size_ptr, struct q_message *msg_ptr, int cur_hops); /* *---------------------------------------------------------------------- * * q_comms_process -- * * Processes an incoming message on a cluster comms link. * * Results: * None. * *---------------------------------------------------------------------- */ void q_comms_process (struct q_server *server_ptr, int msg_id, void *msg_buf, int msg_size) { Ns_Log(Notice, "qcluster: Message %d received of size %d.", msg_id, msg_size); switch (msg_id) { case Q_MSG_TYPE_ID: Ns_Log(Error, "qcluster: Unexpected server identification message, ignoring"); break; case Q_MSG_TYPE_MSG_ADD: q_comms_handle_msg_add(server_ptr, msg_buf, msg_size); break; case Q_MSG_TYPE_MSG_DELEGATE: q_comms_handle_msg_delegate(server_ptr, msg_buf, msg_size); break; case Q_MSG_TYPE_MSG_STATUS_UPDATE: q_comms_handle_status_update_msg(server_ptr, msg_buf, msg_size); break; case Q_MSG_TYPE_MSG_CAN_PROCESS: q_comms_handle_can_process_msg(server_ptr, msg_buf, msg_size); break; case Q_MSG_TYPE_MSG_DELETE_RECOVERED: q_comms_handle_delete_recovered_msg(server_ptr, msg_buf, msg_size); break; default: Ns_Log(Error, "qcluster: Unknown server message from %d (%d), ignoring", server_ptr->svr_name, msg_id); break; } } /* *---------------------------------------------------------------------- * * q_comms_handle_msg_add -- * * Processes an incoming Q_MSG_TYPE_MSG_ADD message. * * Results: * None. * *---------------------------------------------------------------------- */ static void q_comms_handle_msg_add(struct q_server *server_ptr, char *msg_buf, int msg_size) { int msg_id_len; int grp_name_len; char msg_id[Q_MAX_NAME_SIZE]; char grp_name[Q_MAX_NAME_SIZE]; struct q_group *group_ptr; int payload_size; char decoded_msg[Q_MAX_MSG_SIZE]; int decoded_size; int msg_type_id; struct q_message_type *msg_type_ptr; int msg_status; int msg_pos; // Ns_Log(Notice, "q_comms_handle_msg_add: entering"); msg_pos = 0; /* get message id */ msg_id_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(msg_id, &msg_buf[msg_pos], msg_id_len); msg_id[msg_id_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(msg_id_len); /* get group name */ grp_name_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(grp_name, &msg_buf[msg_pos], grp_name_len); grp_name[grp_name_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(grp_name_len); /* get type id */ msg_type_id = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* get message status */ msg_status = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* * Lookup the message type. */ msg_type_ptr = q_get_msg_type_ptr(msg_type_id); if (msg_type_ptr == NULL) { Ns_Log(Error, "Unknown message type id (%d) in q_comms_handle_msg_add, ignoring message", msg_type_id); return; } /* * Lookup group. */ group_ptr = q_get_group_ptr(server_ptr, grp_name); if (group_ptr == NULL) { Ns_Log(Error, "Unknown group name (%s) in q_comms_handle_msg_add, ignoring message", grp_name); return; } /* get payload size */ payload_size = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* get the payload */ msg_type_ptr->construct_in_func(&msg_buf[msg_pos], payload_size, decoded_msg, &decoded_size); /* * Call the function to add the message */ q_remote_queue_msg(msg_type_ptr, group_ptr, msg_id, msg_status, decoded_msg, decoded_size); // Ns_Log(Notice, "q_comms_handle_msg_add: exiting"); } /* *---------------------------------------------------------------------- * * q_comms_handle_msg_delegate -- * * Processes an incoming Q_MSG_TYPE_MSG_DELEGATE message. * * Results: * None. * *---------------------------------------------------------------------- */ static void q_comms_handle_msg_delegate(struct q_server *server_ptr, char *msg_buf, int msg_size) { int msg_id_len; int grp_name_len; char msg_id[Q_MAX_NAME_SIZE]; char grp_name[Q_MAX_NAME_SIZE]; struct q_group *group_ptr; int payload_size; char decoded_msg[Q_MAX_MSG_SIZE]; int decoded_size; int msg_type_id; struct q_message_type *msg_type_ptr; int msg_status; int msg_pos; int max_hops; // Ns_Log(Notice, "q_comms_handle_msg_delegate: entering"); msg_pos = 0; /* get message id */ msg_id_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(msg_id, &msg_buf[msg_pos], msg_id_len); msg_id[msg_id_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(msg_id_len); /* get group name */ grp_name_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(grp_name, &msg_buf[msg_pos], grp_name_len); grp_name[grp_name_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(grp_name_len); /* get type id */ msg_type_id = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* get message status */ msg_status = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* get max hops */ max_hops = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* * Lookup the message type. */ msg_type_ptr = q_get_msg_type_ptr(msg_type_id); if (msg_type_ptr == NULL) { Ns_Log(Error, "Unknown message type id (%d) in q_comms_handle_msg_delegate, ignoring message", msg_type_id); return; } /* * Lookup group. */ group_ptr = q_get_group_ptr(g_server, grp_name); if (group_ptr == NULL) { Ns_Log(Error, "Unknown group name (%s) in q_comms_handle_msg_delegate, ignoring message", grp_name); return; } /* get payload size */ payload_size = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* get the payload */ msg_type_ptr->construct_in_func(&msg_buf[msg_pos], payload_size, decoded_msg, &decoded_size); /* * Call the q_queue_msg function to try and queue the delegated message. */ q_queue_msg(msg_type_ptr, group_ptr, msg_id, msg_status, decoded_msg, decoded_size, max_hops); // Ns_Log(Notice, "q_comms_handle_msg_delegate: exiting"); return; } /* *---------------------------------------------------------------------- * * q_comms_handle_status_update_msg -- * * Processes an incoming Q_MSG_TYPE_STATUS_UPDATE_MSG message. * * Results: * None. * *---------------------------------------------------------------------- */ static void q_comms_handle_status_update_msg(struct q_server *server_ptr, char *msg_buf, int msg_size) { int msg_id_len; int grp_name_len; char msg_id[Q_MAX_NAME_SIZE]; char grp_name[Q_MAX_NAME_SIZE]; struct q_group *group_ptr; int payload_size; char decoded_msg[Q_MAX_MSG_SIZE]; int decoded_size; int msg_status; int msg_pos; // Ns_Log(Notice, "q_comms_handle_status_update_msg: entering"); msg_pos = 0; /* get message id */ msg_id_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(msg_id, &msg_buf[msg_pos], msg_id_len); msg_id[msg_id_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(msg_id_len); /* get group name */ grp_name_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(grp_name, &msg_buf[msg_pos], grp_name_len); grp_name[grp_name_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(grp_name_len); /* get message status */ msg_status = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* * Lookup group. */ group_ptr = q_get_group_ptr(server_ptr, grp_name); if (group_ptr == NULL) { Ns_Log(Error, "q_comms_handle_status_update_msg: Unknown group name (%s), ignoring message", grp_name); return; } q_remote_msg_status_update(msg_id, group_ptr, msg_status); // Ns_Log(Notice, "q_comms_handle_status_update_msg: exiting"); return; } /* *---------------------------------------------------------------------- * * q_comms_handle_delete_recovered_msg -- * * Processes an incoming Q_MSG_TYPE_DELETE_RECOVERED message. * * Results: * None. * *---------------------------------------------------------------------- */ static void q_comms_handle_delete_recovered_msg(struct q_server *server_ptr, char *msg_buf, int msg_size) { int msg_id_len; int grp_name_len; int svr_name_len; char msg_id[Q_MAX_NAME_SIZE]; char grp_name[Q_MAX_NAME_SIZE]; char svr_name[Q_MAX_NAME_SIZE]; struct q_group *group_ptr; int msg_status; int msg_pos; int server_pos; int count; // Ns_Log(Notice, "q_comms_handle_delete_recovered_msg: entering"); msg_pos = 0; /* get message id */ msg_id_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(msg_id, &msg_buf[msg_pos], msg_id_len); msg_id[msg_id_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(msg_id_len); /* get server name */ svr_name_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(svr_name, &msg_buf[msg_pos], svr_name_len); svr_name[svr_name_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(svr_name_len); /* get group name */ grp_name_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(grp_name, &msg_buf[msg_pos], grp_name_len); grp_name[grp_name_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(grp_name_len); /* * Lookup server */ server_pos = -1; for (count = 0; count < g_cluster_size; count++) { if (!strcmp(svr_name, g_cluster_order[count]->svr_name)) { server_pos = count; } } if (server_pos == -1) { Ns_Log(Error, "q_comms_handle_delete_recovered_msg: Unknown server name (%s), ignoring message", svr_name); return; } /* * Lookup group. */ group_ptr = q_get_group_ptr(g_cluster_order[server_pos], grp_name); if (group_ptr == NULL) { Ns_Log(Error, "q_comms_handle_delete_recovered_msg: Unknown group name (%s), ignoring message", grp_name); return; } q_remote_msg_status_update(msg_id, group_ptr, Q_MSG_STATUS_COMPLETE); // Ns_Log(Notice, "q_comms_handle_delete_recovered_msg: exiting"); return; } /* *---------------------------------------------------------------------- * * q_comms_handle_can_process_msg -- * * Processes an incoming Q_MSG_TYPE_CAN_PROCESS message. * * Results: * None. * *---------------------------------------------------------------------- */ static void q_comms_handle_can_process_msg(struct q_server *server_ptr, char *msg_buf, int msg_size) { int grp_name_len; char grp_name[Q_MAX_NAME_SIZE]; struct q_group *group_ptr; int can_process_val; int msg_pos; // Ns_Log(Notice, "q_comms_handle_can_process_msg: entering"); msg_pos = 0; /* get group name */ grp_name_len = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); memcpy(grp_name, &msg_buf[msg_pos], grp_name_len); grp_name[grp_name_len] = '\0'; msg_pos += Q_ALIGNED_SIZE(grp_name_len); /* get can_process value */ can_process_val = ntohl(*((int *)&msg_buf[msg_pos])); msg_pos += sizeof(int); /* * Lookup group. */ group_ptr = q_get_group_ptr(server_ptr, grp_name); if (group_ptr == NULL) { Ns_Log(Error, "q_comms_handle_can_process_msg: Unknown group name (%s), ignoring message", grp_name); return; } q_remote_set_can_process_val(server_ptr, group_ptr, can_process_val); // Ns_Log(Notice, "q_comms_handle_can_process_msg: exiting"); return; } /* *---------------------------------------------------------------------- * * q_comms_build_msg_add_msg -- * * Builds a Q_MSG_TYPE_MSG_ADD message. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ static void q_comms_build_msg_add_msg(unsigned char *msg_buf, int *msg_size_ptr, struct q_message *msg_ptr) { int payload_size; int msg_size; /* * Construct the message to be sent. * Message is of type Q_MSG_TYPE_MSG_ADD: */ msg_size = 0; /* Message id */ *((int *)&msg_buf[msg_size]) = htonl(Q_MSG_TYPE_MSG_ADD); msg_size+= sizeof(int); /* Message size (skip this and fill in at the end */ msg_size+= sizeof(int); /* message id */ *((int *)&msg_buf[msg_size]) = htonl(strlen(msg_ptr->msg_id)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], msg_ptr->msg_id); msg_size+= Q_ALIGNED_SIZE(strlen(msg_ptr->msg_id)); /* group name size, and group name */ *((int *)&msg_buf[msg_size]) = htonl(strlen(msg_ptr->grp_ptr->grp_name)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], msg_ptr->grp_ptr->grp_name); msg_size+= Q_ALIGNED_SIZE(strlen(msg_ptr->grp_ptr->grp_name)); /* message type id */ *((int *)&msg_buf[msg_size]) = htonl(msg_ptr->type_ptr->msg_type_id); msg_size+= sizeof(int); /* message status */ *((int *)&msg_buf[msg_size]) = htonl(msg_ptr->status); msg_size+= sizeof(int); /* message payload */ msg_ptr->type_ptr->construct_out_func(msg_ptr->msg_ptr, msg_ptr->msg_size, &msg_buf[msg_size+sizeof(int)], &payload_size); *((int *)&msg_buf[msg_size]) = htonl(payload_size); msg_size+= Q_ALIGNED_SIZE(payload_size) + sizeof(int); /* Set the total message size */ *((int *)&msg_buf[sizeof(int)]) = htonl(msg_size - sizeof(int)); msg_size+= sizeof(int); *msg_size_ptr = msg_size; } /* *---------------------------------------------------------------------- * * q_comms_send_add_msg -- * * Sends a Q_MSG_TYPE_MSG_ADD message to a specified server. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ int q_comms_send_add_msg(struct q_server *server_ptr, struct q_message *msg_ptr) { char msg_buf[Q_MAX_MSG_SIZE]; int msg_size; // Ns_Log(Notice, "q_comms_distribute_msg: entering (%d)",Q_MAX_MSG_SIZE); /* * Build the message. */ q_comms_build_msg_add_msg(msg_buf, &msg_size, msg_ptr); /* * Sending the message to the server, making sure we're connected first. */ q_get_rlock(server_ptr); if (server_ptr->c_sock != -1) { /* * Lets send the message. */ if (server_send(server_ptr, msg_buf, msg_size) == -1) { Ns_Log(Error,"q_comms_send_add_msg: send error to %s", server_ptr->svr_name); } } else { Ns_Log(Error, "q_comms_send_add_msg: Send attempted to a disconnected server; %s", server_ptr->svr_name); } q_release_lock(server_ptr); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_comms_distribute_msg -- * * Distrubutes a Q_MSG_TYPE_MSG_ADD message amoungst all connected * servers. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ int q_comms_distribute_msg(struct q_server *ignored_server_ptr, struct q_message *msg_ptr) { int count; struct q_server *server_ptr; char msg_buf[Q_MAX_MSG_SIZE]; int msg_size; // Ns_Log(Notice, "q_comms_distribute_msg: entering (%d)",Q_MAX_MSG_SIZE); /* * Build the message. */ q_comms_build_msg_add_msg(msg_buf, &msg_size, msg_ptr); /* * Loop through each server, sending the constructed message. Note that * servers that are currently disconnected are ignored (of course this * includes this one). */ for (count = 0; count < g_cluster_size; count++) { server_ptr = g_cluster_order[count]; if (server_ptr != g_server && server_ptr != ignored_server_ptr) { q_get_rlock(server_ptr); if (server_ptr->c_sock != -1) { /* * Lets send the message. */ if (server_send(server_ptr, msg_buf, msg_size) == -1) { Ns_Log(Error,"q_comms_distribute_msg: send error to %s", server_ptr->svr_name); } } q_release_lock(server_ptr); } } return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_comms_build_delegate_msg -- * * Builds a delegate message. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ static void q_comms_build_delegate_msg(unsigned char *msg_buf, int *msg_size_ptr, struct q_message *msg_ptr, int cur_hops) { int msg_size; int cur_pos; int payload_size; /* * Construct the message to be sent. * Message is of type Q_MSG_TYPE_MSG_ADD: */ msg_size = 0; /* Message id */ *((int *)&msg_buf[msg_size]) = htonl(Q_MSG_TYPE_MSG_DELEGATE); msg_size+= sizeof(int); /* Message size (skip this and fill in at the end */ msg_size+= sizeof(int); /* message id */ *((int *)&msg_buf[msg_size]) = htonl(strlen(msg_ptr->msg_id)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], msg_ptr->msg_id); msg_size+= Q_ALIGNED_SIZE(strlen(msg_ptr->msg_id)); /* group name size, and group name */ *((int *)&msg_buf[msg_size]) = htonl(strlen(msg_ptr->grp_ptr->grp_name)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], msg_ptr->grp_ptr->grp_name); msg_size+= Q_ALIGNED_SIZE(strlen(msg_ptr->grp_ptr->grp_name)); /* message type id */ *((int *)&msg_buf[msg_size]) = htonl(msg_ptr->type_ptr->msg_type_id); msg_size+= sizeof(int); /* message status */ // *((int *)&msg_buf[msg_size]) = htonl(msg_ptr->status); *((int *)&msg_buf[msg_size]) = htonl(Q_MSG_STATUS_READY); msg_size+= sizeof(int); /* max hops field */ *((int *)&msg_buf[msg_size]) = htonl(cur_hops); msg_size+= sizeof(int); /* message payload */ msg_ptr->type_ptr->construct_out_func(msg_ptr->msg_ptr, msg_ptr->msg_size, &msg_buf[msg_size+sizeof(int)], &payload_size); *((int *)&msg_buf[msg_size]) = htonl(payload_size); msg_size+= Q_ALIGNED_SIZE(payload_size) + sizeof(int); /* Set the total message size */ *((int *)&msg_buf[sizeof(int)]) = htonl(msg_size - sizeof(int)); msg_size+= sizeof(int); *msg_size_ptr = msg_size; return; } /* *---------------------------------------------------------------------- * * q_comms_send_delegate_msg -- * * Sends a delegate message to a specified server. Decrements the * cur_hops parameter before it sends the message. * * Results: * NS_TRUE - Successfully delegated * NS_FALSE - Unsuccessfully delegated * *---------------------------------------------------------------------- */ int q_comms_send_delegate_msg(struct q_server *remote_server_ptr, struct q_message *msg_ptr, int cur_hops) { int count; struct q_server *server_ptr; char msg_buf[Q_MAX_MSG_SIZE]; int msg_size; int cur_pos; int sent_flag; int payload_size; // Ns_Log(Notice, "q_comms_send_delegate_msg: Entering"); /* * If we've reached the maximum number of hops allowed for this message, * then return the failure notice for this deligation operation. */ if (cur_hops == 0) { return NS_FALSE; } cur_hops--; /* * Construct the message to be sent. */ q_comms_build_delegate_msg(msg_buf, &msg_size, msg_ptr, cur_hops); q_get_rlock(remote_server_ptr); if (remote_server_ptr->c_sock != -1) { /* * Lets send the message. */ if (server_send(remote_server_ptr, msg_buf, msg_size) == -1) { Ns_Log(Error,"q_comms_send_delegate_msg: send error to %s", remote_server_ptr->svr_name); } else { sent_flag = 1; } } else { Ns_Log(Error, "q_comms_send_delegate_msg: Send attempted to a disconnected server; %s", remote_server_ptr->svr_name); } q_release_lock(remote_server_ptr); if (sent_flag) { return NS_TRUE; } else { return NS_FALSE; } } /* *---------------------------------------------------------------------- * * q_comms_delegate_msg -- * * Passes a message onto the nearest neighbour server. Decrements the * cur_hops parameter before it sends the message. * The ignored_server_ptr may be populated if a particular server in * the server list should be ignored. NULL otherwise. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ int q_comms_delegate_msg(struct q_server *ignored_server_ptr, struct q_message *msg_ptr, int cur_hops) { int count; struct q_server *server_ptr; char msg_buf[Q_MAX_MSG_SIZE]; int msg_size; int cur_pos; int sent_flag; int payload_size; // Ns_Log(Notice, "q_comms_delegate_msg: Entering"); /* * If we've reached the maximum number of hops allowed for this message, * then return the failure notice for this deligation operation. */ if (cur_hops == 0) { return NS_FALSE; } cur_hops--; /* * Construct the message to be sent. */ q_comms_build_delegate_msg(msg_buf, &msg_size, msg_ptr, cur_hops); /* * Loop until we find our nearest neighbour, or we loop back round to * this server (ie, no connected servers exist). */ cur_pos = (g_this_server_pos + 1) % g_cluster_size; sent_flag = 0; while (cur_pos != g_this_server_pos && !sent_flag) { server_ptr = g_cluster_order[cur_pos]; if (server_ptr != ignored_server_ptr) { q_get_rlock(server_ptr); if (server_ptr->c_sock != -1) { /* * Lets send the message. */ if (server_send(server_ptr, msg_buf, msg_size) == -1) { Ns_Log(Error,"q_comms_delegate_msg: send error to %s", server_ptr->svr_name); } sent_flag = 1; } q_release_lock(server_ptr); } cur_pos = (cur_pos + 1) % g_cluster_size; } if (sent_flag) { return NS_TRUE; } else { return NS_FALSE; } } /* *---------------------------------------------------------------------- * * q_comms_msg_status_update -- * * Distrubutes a Q_MSG_TYPE_STATUS_UPDATE message amoungst all connected * servers. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ int q_comms_msg_status_update(struct q_message *msg_ptr) { int count; struct q_server *server_ptr; char msg_buf[Q_MAX_MSG_SIZE]; int msg_size; int n; /* * Construct the message to be sent. * Message is of type Q_MSG_TYPE_MSG_STATUS_UPDATE: */ msg_size = 0; /* Message id */ *((int *)&msg_buf[msg_size]) = htonl(Q_MSG_TYPE_MSG_STATUS_UPDATE); msg_size+= sizeof(int); /* Message size (skip this and fill in at the end */ msg_size+= sizeof(int); /* message id */ *((int *)&msg_buf[msg_size]) = htonl(strlen(msg_ptr->msg_id)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], msg_ptr->msg_id); msg_size+= Q_ALIGNED_SIZE(strlen(msg_ptr->msg_id)); /* group name size, and group name */ *((int *)&msg_buf[msg_size]) = htonl(strlen(msg_ptr->grp_ptr->grp_name)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], msg_ptr->grp_ptr->grp_name); msg_size+= Q_ALIGNED_SIZE(strlen(msg_ptr->grp_ptr->grp_name)); /* message status */ *((int *)&msg_buf[msg_size]) = htonl(msg_ptr->status); msg_size+= sizeof(int); /* Set the total message size */ *((int *)&msg_buf[sizeof(int)]) = htonl(msg_size - sizeof(int)); msg_size+= sizeof(int); /* * Loop through each server, sending the constructed message. Note that * servers that are currently disconnected are ignored (of course this * includes this one). */ for (count = 0; count < g_cluster_size; count++) { server_ptr = g_cluster_order[count]; if (server_ptr != g_server) { q_get_rlock(server_ptr); if (server_ptr->c_sock != -1) { /* * Lets send the message. */ if (server_send(server_ptr, msg_buf, msg_size) == -1) { Ns_Log(Error,"q_comms_msg_status_update: send error to %s", server_ptr->svr_name); } } q_release_lock(server_ptr); } } return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_comms_build_can_process_msg -- * * Builds a Q_MSG_TYPE_MSG_CAN_PROCESS message. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ static void q_comms_build_can_process_msg(unsigned char *msg_buf, int *msg_size_ptr, struct q_group *group_ptr) { int payload_size; int msg_size; /* * Construct the message to be sent. * Message is of type Q_MSG_TYPE_CAN_PROCESS: */ msg_size = 0; /* Message id */ *((int *)&msg_buf[msg_size]) = htonl(Q_MSG_TYPE_MSG_CAN_PROCESS); msg_size+= sizeof(int); /* Message size (skip this and fill in at the end */ msg_size+= sizeof(int); /* group name size, and group name */ *((int *)&msg_buf[msg_size]) = htonl(strlen(group_ptr->grp_name)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], group_ptr->grp_name); msg_size+= Q_ALIGNED_SIZE(strlen(group_ptr->grp_name)); /* can_process_val */ *((int *)&msg_buf[msg_size]) = htonl(group_ptr->can_process_p); msg_size+= sizeof(int); /* Set the total message size */ *((int *)&msg_buf[sizeof(int)]) = htonl(msg_size - sizeof(int)); msg_size+= sizeof(int); *msg_size_ptr = msg_size; } /* *---------------------------------------------------------------------- * * q_comms_can_process_msg -- * * Distrubutes a Q_MSG_TYPE_CAN_PROCESS message amoungst all connected * servers. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ int q_comms_can_process_msg(struct q_group *group_ptr) { int count; struct q_server *server_ptr; char msg_buf[Q_MAX_MSG_SIZE]; int msg_size; /* * Construct the message to be sent. */ q_comms_build_can_process_msg(msg_buf, &msg_size, group_ptr); /* * Loop through each server, sending the constructed message. Note that * servers that are currently disconnected are ignored (of course this * includes this one). */ for (count = 0; count < g_cluster_size; count++) { server_ptr = g_cluster_order[count]; if (server_ptr != g_server) { q_get_rlock(server_ptr); if (server_ptr->c_sock != -1) { /* * Lets send the message. */ if (server_send(server_ptr, msg_buf, msg_size) == -1) { Ns_Log(Error,"q_comms_can_process_msg: send error to %s", server_ptr->svr_name); } } q_release_lock(server_ptr); } } return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_comms_send_can_process_msg -- * * Sends a Q_MSG_TYPE_CAN_PROCESS message to a specified remote server. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ int q_comms_send_can_process_msg(struct q_server *server_ptr, struct q_group *group_ptr) { int count; char msg_buf[Q_MAX_MSG_SIZE]; int msg_size; /* * Construct the message to be sent. */ q_comms_build_can_process_msg(msg_buf, &msg_size, group_ptr); /* * Send the constructed message. */ q_get_rlock(server_ptr); if (server_ptr->c_sock != -1) { /* * Lets send the message. */ if (server_send(server_ptr, msg_buf, msg_size) == -1) { Ns_Log(Error,"q_comms_can_process_msg: send error to %s", server_ptr->svr_name); } } else { Ns_Log(Error, "q_comms_send_can_process_msg: Send attempted to a disconnected server; %s", server_ptr->svr_name); } q_release_lock(server_ptr); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_server_connect_dump_messages -- * * Called when a client connect succeeds to a remote server. This function * loops through all local queues, sending a MESSAGE_ADD message, * thus populating the newly connected servers own copy of this servers * queue. * * Results: * *---------------------------------------------------------------------- */ int q_comms_server_connect_dump_messages(struct q_server *remote_server_ptr) { struct q_group *group_ptr; Tcl_HashEntry *grp_entry_ptr; Tcl_HashEntry *grp_search_ptr; Tcl_HashSearch grp_search; struct q_message *message_ptr; struct q_message_search msg_search; q_get_wlock(g_server); Ns_Log(Notice,"Dumping queued messages to server %s", remote_server_ptr->svr_name); grp_search_ptr = Tcl_FirstHashEntry(&g_server->groups_ht, &grp_search); while (grp_search_ptr != NULL) { group_ptr = Tcl_GetHashValue(grp_search_ptr); q_get_rlock(group_ptr); q_comms_send_can_process_msg(remote_server_ptr, group_ptr); Ns_Log(Notice,"Dumping group %s", group_ptr->grp_name); message_ptr = q_data_first_msg_entry(group_ptr, -1, &msg_search); while (message_ptr != NULL) { Ns_Log(Notice,"Dumping message %s", message_ptr->msg_id); /* * Recover this individual message. */ q_comms_send_add_msg(remote_server_ptr, message_ptr); message_ptr = q_data_next_msg_entry(&msg_search); } q_release_lock(group_ptr); grp_search_ptr = Tcl_NextHashEntry(&grp_search); } q_release_lock(g_server); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_comms_recover_server -- * * This function is called when a connection to a remote server * fails. The list of servers is checked and if we're the nearest * neighbour to the failed server, recover each message. * * Results: * None. * *---------------------------------------------------------------------- */ int q_comms_recover_server(struct q_server *failed_server_ptr) { int cur_pos; int complete_flag; int connected_count; /* Count of the number of connected servers */ /* between this one and the failed server */ struct q_server *server_ptr; struct q_group *group_ptr; Tcl_HashEntry *grp_entry_ptr; Tcl_HashEntry *grp_search_ptr; Tcl_HashSearch grp_search; struct q_message *message_ptr; struct q_message_search msg_search; Ns_Log(Notice,"Recovering server %x, %s", failed_server_ptr, failed_server_ptr->svr_name); /* * Loop until we find our nearest neighbour, or we loop back round to * this server (ie, no connected servers exist). */ cur_pos = (g_this_server_pos + 1) % g_cluster_size; complete_flag = 0; connected_count = 0; while (cur_pos != g_this_server_pos && !complete_flag) { server_ptr = g_cluster_order[cur_pos]; if (server_ptr == failed_server_ptr) { complete_flag = 1; } else { q_get_rlock(server_ptr); if (server_ptr->c_sock != -1) { connected_count++; } q_release_lock(server_ptr); } cur_pos = (cur_pos + 1) % g_cluster_size; } /* * If connected_count indicates that there are no connected servers between * this server and the failed server, then we are the nearest neighbour. * Recover the messages. */ Ns_Log(Notice,"Recovering (2) server %x", failed_server_ptr); Ns_Log(Notice,"Waiting for lock on server %x, %s", failed_server_ptr, failed_server_ptr->svr_name); if (connected_count == 0) { q_get_wlock(failed_server_ptr); Ns_Log(Notice,"Recovering server %x, %s", failed_server_ptr, failed_server_ptr->svr_name); grp_search_ptr = Tcl_FirstHashEntry(&failed_server_ptr->groups_ht, &grp_search); while (grp_search_ptr != NULL) { group_ptr = Tcl_GetHashValue(grp_search_ptr); q_get_wlock(group_ptr); Ns_Log(Notice,"Recovering group %x, %s", group_ptr, group_ptr->grp_name); message_ptr = q_data_first_msg_entry(group_ptr, -1, &msg_search); while (message_ptr != NULL) { Ns_Log(Notice,"Recovering message %s", message_ptr->msg_id); /* * Recover this individual message. */ q_comms_delete_recovered_message(failed_server_ptr, message_ptr); q_recover_msg(failed_server_ptr, message_ptr); Ns_Log(Notice,"Recovered message %s", message_ptr->msg_id); message_ptr = q_data_next_msg_entry(&msg_search); } q_release_lock(group_ptr); grp_search_ptr = Tcl_NextHashEntry(&grp_search); } q_release_lock(failed_server_ptr); } return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_comms_delete_recovered_message -- * * Distrubutes a Q_MSG_TYPE_MSG_DELETE_RECOVERED message amoungst all * connected servers except the failed one. * * Results: * NS_TRUE * *---------------------------------------------------------------------- */ int q_comms_delete_recovered_message(struct q_server *failed_server_ptr, struct q_message *msg_ptr) { int count; struct q_server *server_ptr; char msg_buf[Q_MAX_MSG_SIZE]; int msg_size; /* * Construct the message to be sent. * Message is of type Q_MSG_TYPE_MSG_DELETE_RECOVERED: */ msg_size = 0; /* Message id */ *((int *)&msg_buf[msg_size]) = htonl(Q_MSG_TYPE_MSG_DELETE_RECOVERED); msg_size+= sizeof(int); /* Message size (skip this and fill in at the end */ msg_size+= sizeof(int); /* message id */ *((int *)&msg_buf[msg_size]) = htonl(strlen(msg_ptr->msg_id)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], msg_ptr->msg_id); msg_size+= Q_ALIGNED_SIZE(strlen(msg_ptr->msg_id)); /* server name size, and group name */ *((int *)&msg_buf[msg_size]) = htonl(strlen(failed_server_ptr->svr_name)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], failed_server_ptr->svr_name); msg_size+= Q_ALIGNED_SIZE(strlen(failed_server_ptr->svr_name)); /* group name size, and group name */ *((int *)&msg_buf[msg_size]) = htonl(strlen(msg_ptr->grp_ptr->grp_name)); msg_size+= sizeof(int); strcpy(&msg_buf[msg_size], msg_ptr->grp_ptr->grp_name); msg_size+= Q_ALIGNED_SIZE(strlen(msg_ptr->grp_ptr->grp_name)); /* Set the total message size */ *((int *)&msg_buf[sizeof(int)]) = htonl(msg_size - sizeof(int)); msg_size+= sizeof(int); /* * Loop through each server, sending the constructed message. Note that * servers that are currently disconnected are ignored (of course this * includes this one), and also ignore the failed server (so we don't * conflict with any locks the calling procedure may have). */ for (count = 0; count < g_cluster_size; count++) { server_ptr = g_cluster_order[count]; if (server_ptr != g_server && server_ptr != failed_server_ptr) { q_get_rlock(server_ptr); if (server_ptr->c_sock != -1) { /* * Lets send the message. */ if (server_send(server_ptr, msg_buf, msg_size) == -1) { Ns_Log(Error,"q_comms_delete_recovered_message: send error to %s", server_ptr->svr_name); } } q_release_lock(server_ptr); } } return NS_TRUE; }