/* * This file is part of QCluster. * * QCluster is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * QCluster is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with QCluster; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Peter Harper * * $Id: cluster.c,v 1.1 2003/12/02 06:17:38 rmello Exp $ */ static char rcsid[] = "@(#) $Id: "; #include "ns.h" #include "cluster.h" #include "clustercomms.h" #include "message.h" #include "locks.h" /* * Static procedures. */ int q_data_get_ll_entry(struct q_group *group_ptr, char *msg_id, struct q_message_entry **queue_entry_ptr, struct q_message_entry **queue_by_stat_entry_ptr); /* * Global qcluster data */ Ns_RWLock g_cluster_mutex; Tcl_HashTable g_cluster; struct q_server *g_cluster_order[Q_MAX_SERVERS]; int g_cluster_size; int g_this_server_pos; char *g_addr; char *g_iam; struct q_server *g_server; int g_port; int g_qcluster_started = 0; Ns_Mutex g_recover_mutex; /* * Macros to add and delete a message to/from a linked list */ #define Q_DATA_LIST_APPEND(msg_ptr, list_ptr) \ { \ struct q_message_entry *entry_ptr; \ entry_ptr = (struct q_message_entry *) \ Ns_Malloc(sizeof(struct q_message_entry)); \ entry_ptr->msg_ptr = msg_ptr; \ if (*list_ptr == NULL) { \ *list_ptr = entry_ptr; \ entry_ptr->prev_ptr = NULL; \ entry_ptr->next_ptr = NULL; \ } else { \ struct q_message_entry *search_ptr; \ search_ptr = *list_ptr; \ while (search_ptr->next_ptr != NULL) { \ search_ptr = search_ptr->next_ptr; \ } \ entry_ptr->next_ptr = NULL; \ entry_ptr->prev_ptr = search_ptr; \ search_ptr->next_ptr = entry_ptr; \ } \ } #define Q_DATA_LIST_DELETE(entry_ptr, list_ptr) \ { \ if (entry_ptr->prev_ptr == entry_ptr->next_ptr && \ entry_ptr->next_ptr == NULL) { \ *list_ptr = NULL; \ } else { \ if (entry_ptr->prev_ptr != NULL) { \ entry_ptr->prev_ptr->next_ptr = entry_ptr->next_ptr; \ } \ if (entry_ptr->next_ptr != NULL) { \ entry_ptr->next_ptr->prev_ptr = entry_ptr->prev_ptr; \ } \ if (*list_ptr == entry_ptr) { \ *list_ptr = entry_ptr->next_ptr; \ } \ } \ ns_free(entry_ptr); \ } /**************************************************************************** * The following section contains functions for manipulating message * queues within groups. These are meant as a complete API between * manipulating functions and the underlying data used to model the * message queues. * * The original code used a hash table to model the messages, but this was * changed to include a linked list in addition to this, so that an explicit * ordering (based on time) could be used. * * NB: All these functions assume that locks on the specified group have * *ALREADY BEEN ASSIGNED*. No explicit locking is done in these * functions. ****************************************************************************/ /* *---------------------------------------------------------------------- * * q_data_add_msg -- * * Adds the specified message to a group * * Results: * * Side effects: * New EvalThread will be created. * *---------------------------------------------------------------------- */ int q_data_add_msg(struct q_group *group_ptr, struct q_message *msg_ptr) { Tcl_HashEntry *entry_ptr; int new_flag; // Ns_Log(Notice,"q_data_add_msg: entry (%s)", msg_ptr->msg_id); /* * Create the linked list entry */ Q_DATA_LIST_APPEND(msg_ptr, (&group_ptr->queue_ll)); Q_DATA_LIST_APPEND(msg_ptr, (&group_ptr->queue_by_stat_ll[msg_ptr->status])); // entry_ptr = Tcl_CreateHashEntry(&group_ptr->queue_ht, // msg_id, &new_flag); // Tcl_SetHashValue(entry_ptr, (void *)msg_ptr); // Ns_Log(Notice,"q_data_add_msg: exit (%s)", msg_ptr->msg_id); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_data_delete_msg -- * * Deletes the specified message from the group * * Results: * *---------------------------------------------------------------------- */ int q_data_delete_msg(struct q_group *group_ptr, char *msg_id, int free_msg_ptr_p, int free_payload_ptr_p) { struct q_message_entry *main_entry_ptr; struct q_message_entry *by_stat_entry_ptr; // Ns_Log(Notice,"q_data_delete_msg: entry (%s)", msg_id); if (q_data_get_ll_entry(group_ptr, msg_id, &main_entry_ptr, &by_stat_entry_ptr) == NS_FALSE) { Ns_Log(Error, "q_data_delete_msg: Message id %s in group %s not found", msg_id, group_ptr->grp_name); // Ns_Log(Notice,"q_data_delete_msg: exit NS_FALSE (%s)", msg_id); return NS_FALSE; } else { if (free_payload_ptr_p == NS_TRUE) { ns_free(main_entry_ptr->msg_ptr->msg_ptr); } if (free_msg_ptr_p == NS_TRUE) { ns_free(main_entry_ptr->msg_ptr); } Q_DATA_LIST_DELETE(main_entry_ptr, &group_ptr->queue_ll); Q_DATA_LIST_DELETE(\ by_stat_entry_ptr, (&group_ptr->queue_by_stat_ll[by_stat_entry_ptr->msg_ptr->status])); // Ns_Log(Notice,"q_data_delete_msg: exit NS_TRUE (%s)", msg_id); return NS_TRUE; } } /* *---------------------------------------------------------------------- * * q_data_get_ll_entry -- * * Searches for entries in each linked list, given a message id. * * Results: * NS_TRUE if entries in both linked lists were found. * NS_FALSE if entries in both linked lists were not found. * *---------------------------------------------------------------------- */ int q_data_get_ll_entry(struct q_group *group_ptr, char *msg_id, struct q_message_entry **queue_entry_ptr, struct q_message_entry **queue_by_stat_entry_ptr) { struct q_message_entry *entry_ptr; struct q_message_entry *found_ptr; /* * Search for an entry in the main message linked list. */ entry_ptr = group_ptr->queue_ll; found_ptr = NULL; while (entry_ptr != NULL && found_ptr == NULL) { if (!strcmp(entry_ptr->msg_ptr->msg_id, msg_id)) { found_ptr = entry_ptr; } entry_ptr = entry_ptr->next_ptr; } *queue_entry_ptr = found_ptr; /* * Search for an entry in the status specific message linked list. */ if (found_ptr != NULL) { entry_ptr = group_ptr->queue_by_stat_ll[found_ptr->msg_ptr->status]; found_ptr = NULL; while (entry_ptr != NULL && found_ptr == NULL) { if (!strcmp(entry_ptr->msg_ptr->msg_id, msg_id)) { found_ptr = entry_ptr; } entry_ptr = entry_ptr->next_ptr; } } *queue_by_stat_entry_ptr = found_ptr; /* * If either queue entry wasn't found, return NS_FALSE. Otherwise return * NS_TRUE; */ if (queue_entry_ptr == NULL || queue_by_stat_entry_ptr == NULL) { return NS_FALSE; } else { return NS_TRUE; } } /* *---------------------------------------------------------------------- * * q_data_get_msg_entry -- * * Searches for a message entry matching the specified message id. * * Results: * NS_TRUE if entries in both linked lists were found. * NS_FALSE if entries in both linked lists were not found. * *---------------------------------------------------------------------- */ struct q_message * q_data_get_msg_entry(struct q_group *group_ptr, char *msg_id) { struct q_message_entry *entry_ptr; struct q_message_entry *found_ptr; /* * Search for an entry in the main message linked list. */ entry_ptr = group_ptr->queue_ll; found_ptr = NULL; while (entry_ptr != NULL && found_ptr == NULL) { //Ns_Log(Notice,"Entry %x -> %x", entry_ptr, entry_ptr->next_ptr); if (!strcmp(entry_ptr->msg_ptr->msg_id, msg_id)) { found_ptr = entry_ptr; } entry_ptr = entry_ptr->next_ptr; } if (found_ptr == NULL) { return NULL; } else { return found_ptr->msg_ptr; } } /* *---------------------------------------------------------------------- * * q_data_update_msg_status -- * * This function should be called when the status of a message * structure is changed. This function moves the message to the * appropriate status specific linked list. * * Results: * NS_TRUE if entries in both linked lists were found. * NS_FALSE if entries in both linked lists were not found. * *---------------------------------------------------------------------- */ int q_data_update_msg_status(struct q_group *group_ptr, struct q_message *msg_ptr, int new_status) { struct q_message_entry *main_entry_ptr; struct q_message_entry *by_stat_entry_ptr; if (q_data_get_ll_entry(group_ptr, msg_ptr->msg_id, &main_entry_ptr, &by_stat_entry_ptr) == NS_FALSE) { Ns_Log(Error, "q_data_update_msg_status: Message id %s in group %s not found", msg_ptr->msg_id, group_ptr->grp_name); return NS_FALSE; } else { Q_DATA_LIST_DELETE(\ by_stat_entry_ptr, &group_ptr->queue_by_stat_ll[msg_ptr->status]); msg_ptr->status = new_status; Q_DATA_LIST_APPEND(msg_ptr, &group_ptr->queue_by_stat_ll[msg_ptr->status]); return NS_TRUE; } } /* *---------------------------------------------------------------------- * * q_data_first_msg_entry -- * * Call this function to retrieve the first entry in the message queue. * If specific_status is -1, all messages will be trawled, otherwise only * those messages with the appropriate status. * * Results: * NULL if no messages in the queue, pointer to the message otherwise. * *---------------------------------------------------------------------- */ struct q_message * q_data_first_msg_entry(struct q_group *group_ptr, int specific_status, struct q_message_search *search_ptr) { search_ptr->specific_status = specific_status; if (specific_status == -1) { search_ptr->cur_ptr = group_ptr->queue_ll; } else { search_ptr->cur_ptr = group_ptr->queue_by_stat_ll[specific_status]; } if (search_ptr->cur_ptr == NULL) { search_ptr->prev_ptr = NULL; search_ptr->next_ptr = NULL; return NULL; } else { search_ptr->prev_ptr = search_ptr->cur_ptr->prev_ptr; search_ptr->next_ptr = search_ptr->cur_ptr->next_ptr; return search_ptr->cur_ptr->msg_ptr; } } /* *---------------------------------------------------------------------- * * q_data_next_msg_entry -- * * Called to move to the next message in the linked list search. * Results: * NULL if no further messages in the queue, pointer to the message * otherwise. * *---------------------------------------------------------------------- */ struct q_message * q_data_next_msg_entry(struct q_message_search *search_ptr) { search_ptr->cur_ptr = search_ptr->next_ptr; if (search_ptr->cur_ptr == NULL) { search_ptr->prev_ptr = NULL; search_ptr->next_ptr = NULL; return NULL; } else { search_ptr->prev_ptr = search_ptr->cur_ptr->prev_ptr; search_ptr->next_ptr = search_ptr->cur_ptr->next_ptr; return search_ptr->cur_ptr->msg_ptr; } }