57#include <sys/socket.h>
62#include <netinet/in.h>
84#define LOGSYS_UTILS_ONLY 1
95#define LOCALHOST_IP inet_addr("127.0.0.1")
96#define QUEUE_RTR_ITEMS_SIZE_MAX 16384
97#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384
98#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500
100#define RETRANSMIT_ENTRIES_MAX 30
101#define TOKEN_SIZE_MAX 64000
102#define LEAVE_DUMMY_NODEID 0
121#define SEQNO_START_MSG 0x0
122#define SEQNO_START_TOKEN 0x0
144#define ENDIAN_LOCAL 0xff22
457 unsigned int msg_len,
462 const unsigned int *member_list,
size_t member_list_entries,
584static int message_handler_orf_token (
590static int message_handler_mcast (
596static int message_handler_memb_merge_detect (
602static int message_handler_memb_join (
608static int message_handler_memb_commit_token (
614static int message_handler_token_hold_cancel (
622static void srp_addr_to_nodeid (
626 unsigned int entries);
641static void target_set_completed (
void *context);
643static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
651static void mcast_endian_convert (
const struct mcast *
in,
struct mcast *
out);
652static void memb_merge_detect_endian_convert (
656static void timer_function_orf_token_timeout (
void *data);
657static void timer_function_orf_token_warning (
void *data);
658static void timer_function_pause_timeout (
void *data);
659static void timer_function_heartbeat_timeout (
void *data);
660static void timer_function_token_retransmit_timeout (
void *data);
661static void timer_function_token_hold_retransmit_timeout (
void *data);
662static void timer_function_merge_detect_timeout (
void *data);
670 unsigned int msg_len,
681 message_handler_orf_token,
682 message_handler_mcast,
683 message_handler_memb_merge_detect,
684 message_handler_memb_join,
685 message_handler_memb_commit_token,
686 message_handler_token_hold_cancel
690#define log_printf(level, format, args...) \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
697#define LOGSYS_PERROR(err_num, level, fmt, args...) \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
759 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(
now_msec -
timestamp_msec));
804static void totempg_mtu_changed(
void *context,
int net_mtu)
811 "Net MTU changed to %d, new value is %d",
827 unsigned int msg_len,
832 const unsigned int *member_list,
size_t member_list_entries,
837 int waiting_trans_ack))
843 if (instance ==
NULL) {
847 totemsrp_instance_initialize (instance);
885 "Token Timeout (%d ms) retransmit timeout (%d ms)",
890 "Token warning every %d ms (%d%% of Token Timeout)",
894 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
899 "Token warnings disabled");
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
912 "downcheck (%d ms) fail to recv const (%d msgs)",
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
922 "missed count const (%d messages)",
950 timer_function_pause_timeout (instance);
954 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
965 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
969 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
989 target_set_completed);
1011 token_event_stats_collector,
1017 token_event_stats_collector,
1031 memb_leave_message_send (instance);
1150 if (
a->nodeid ==
b->nodeid) {
1156static void srp_addr_to_nodeid (
1160 unsigned int entries)
1164 for (
i = 0;
i < entries;
i++) {
1183static void memb_set_subtract (
1212static void memb_consensus_set (
1236static int memb_consensus_isset (
1253static int memb_consensus_agreed (
1266 if (memb_consensus_isset (instance, &
token_memb[
i]) == 0) {
1286static void memb_consensus_notset (
1298 if (memb_consensus_isset (instance, &instance->
my_proc_list[
i]) == 0) {
1308static int memb_set_equal (
1338static int memb_set_subset (
1365static void memb_set_merge (
1389static void memb_set_and_with_ring_id (
1423static void memb_set_log(
1452static void my_leave_memb_clear(
1459static unsigned int my_leave_memb_match(
1464 unsigned int ret = 0;
1475static void my_leave_memb_set(
1521 timer_function_token_retransmit_timeout,
1522 &instance->timer_orf_token_retransmit_timeout);
1538 timer_function_merge_detect_timeout,
1539 &instance->timer_merge_detect_timeout);
1567 "Saving state aru %x high seq received %x",
1577 "Restoring instance->my_aru %x my high seq received %x",
1584 "Resetting old ring state");
1597 timer_function_pause_timeout,
1598 &instance->timer_pause_timeout);
1612 timer_function_orf_token_warning,
1613 &instance->timer_orf_token_warning);
1627 timer_function_orf_token_timeout,
1628 &instance->timer_orf_token_timeout);
1634 reset_token_warning(instance);
1645 timer_function_heartbeat_timeout,
1646 &instance->timer_heartbeat_timeout);
1661 cancel_token_warning(instance);
1673static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1681 timer_function_token_hold_retransmit_timeout,
1682 &instance->timer_orf_token_hold_retransmit_timeout);
1688static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1694static void memb_state_consensus_timeout_expired (
1701 if (memb_consensus_agreed (instance)) {
1702 memb_consensus_reset (instance);
1704 memb_consensus_set (instance, &instance->
my_id);
1706 reset_token_timeout (instance);
1708 memb_consensus_notset (
1728static void timer_function_pause_timeout (
void *data)
1733 reset_pause_timeout (instance);
1738 old_ring_state_restore (instance);
1743static void timer_function_orf_token_warning (
void *data)
1754 reset_token_warning(instance);
1756 cancel_token_warning(instance);
1760static void timer_function_orf_token_timeout (
void *data)
1767 "The token was lost in the OPERATIONAL state.");
1769 "A processor failed, forming new configuration:"
1770 " token timed out (%ums), waiting %ums for consensus.",
1780 "The consensus timeout expired (%ums).",
1782 memb_state_consensus_timeout_expired (instance);
1789 "The token was lost in the COMMIT state.");
1796 "The token was lost in the RECOVERY state.");
1797 memb_recovery_state_token_loss (instance);
1803static void timer_function_heartbeat_timeout (
void *data)
1807 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1808 timer_function_orf_token_timeout(data);
1811static void memb_timer_function_state_gather (
void *data)
1823 memb_join_message_send (instance);
1834 memb_timer_function_state_gather,
1835 &instance->memb_timer_state_gather_join_timeout);
1844static void memb_timer_function_gather_consensus_timeout (
void *data)
1847 memb_state_consensus_timeout_expired (instance);
1850static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1855 unsigned int range = 0;
1944 memb_consensus_reset (instance);
1946 old_ring_state_reset (instance);
1948 deliver_messages_from_recovery_to_regular (instance);
1951 "Delivering to app %x to %x",
2112 if (my_leave_memb_match(instance,
left_list[
i]) == 0) {
2128 my_leave_memb_clear(instance);
2131 "entering OPERATIONAL state.");
2141 "Failed to receive the leave message.%s",
2152 reset_pause_timeout (instance);
2165static void memb_state_gather_enter (
2176 &instance->
my_id, 1,
2179 memb_join_message_send (instance);
2190 memb_timer_function_state_gather,
2191 &instance->memb_timer_state_gather_join_timeout);
2206 memb_timer_function_gather_consensus_timeout,
2207 &instance->memb_timer_state_gather_consensus_timeout);
2215 cancel_token_retransmit_timeout (instance);
2216 cancel_token_timeout (instance);
2217 cancel_merge_detect_timeout (instance);
2219 memb_consensus_reset (instance);
2221 memb_consensus_set (instance, &instance->
my_id);
2224 "entering GATHER state from %d(%s).",
2240static void timer_function_token_retransmit_timeout (
void *data);
2242static void target_set_completed (
2247 memb_state_commit_token_send (instance);
2251static void memb_state_commit_enter (
2254 old_ring_state_save (instance);
2256 memb_state_commit_token_update (instance);
2258 memb_state_commit_token_target_set (instance);
2275 "entering COMMIT state.");
2278 reset_token_retransmit_timeout (instance);
2279 reset_token_timeout (instance);
2295static void memb_state_recovery_enter (
2302 unsigned int range = 0;
2312 "entering RECOVERY state.");
2323 memb_state_commit_token_send_recovery (instance, commit_token);
2332 commit_token->addr_entries);
2342 memb_set_and_with_ring_id (
2364 "aru %x high delivered %x received flag %d",
2374 for (
i = 0;
i < commit_token->addr_entries;
i++) {
2393 for (
i = 0;
i < commit_token->addr_entries;
i++) {
2425 "copying all old ring messages from %x-%x.",
2468 "Did not need to originate any messages in recovery.");
2478 reset_token_timeout (instance);
2479 reset_token_retransmit_timeout (instance);
2492 token_hold_cancel_send (instance);
2590static int orf_token_remcast (
2634static void messages_free (
2643 unsigned int range = 0;
2684 "releasing messages up to and including %x",
release_to);
2688static void update_aru (
2724static int orf_token_mcast (
2739 reset_token_retransmit_timeout (instance);
2791 update_aru (instance);
2803static int orf_token_rtr (
2813 unsigned int range = 0;
2947static void timer_function_token_retransmit_timeout (
void *data)
2957 token_retransmit (instance);
2958 reset_token_retransmit_timeout (instance);
2963static void timer_function_token_hold_retransmit_timeout (
void *data)
2974 token_retransmit (instance);
2979static void timer_function_merge_detect_timeout(
void *data)
2988 memb_merge_detect_transmit (instance);
3001static int token_send (
3100static void memb_state_commit_token_update (
3167static void memb_state_commit_token_target_set (
3181static int memb_state_commit_token_send_recovery (
3207 reset_token_retransmit_timeout (instance);
3211static int memb_state_commit_token_send (
3236 reset_token_retransmit_timeout (instance);
3266static int srp_addr_compare (
const void *
a,
const void *
b)
3280static void memb_state_commit_token_create (
3289 "Creating commit token because I am the rep.");
3341 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3345 "memb_join_message too long. Ignoring message.");
3399 "sending join/leave message");
3406 &instance->
my_id, 1,
3411 &instance->
my_id, 1);
3418 "memb_leave message too long. Ignoring message.");
3488static void memb_ring_id_set (
3507 token_hold_cancel_send (instance);
3510 if (callback_handle == 0) {
3516 callback_handle->
data = (
void *)
data;
3518 callback_handle->
delete =
delete;
3544static void token_callbacks_execute (
3578 if (
res == -1 &&
del == 1) {
3613static int fcc_calculate (
3618 unsigned int backlog_calc;
3626 instance->
my_cbl = backlog_get (instance);
3646static void fcc_rtr_limit (
3664static void fcc_token_update (
3678static int check_orf_token_sanity(
3691 "Received orf_token message is too long... ignoring.");
3696 if (msg_len <
sizeof(
struct orf_token)) {
3698 "Received orf_token message is too short... ignoring.");
3711 "Received orf_token message rtr_entries is corrupted... ignoring.");
3719 "Received orf_token message is too short... ignoring.");
3727static int check_mcast_sanity(
3734 if (msg_len <
sizeof(
struct mcast)) {
3736 "Received mcast message is too short... ignoring.");
3744static int check_memb_merge_detect_sanity(
3753 "Received memb_merge_detect message is too short... ignoring.");
3761static int check_memb_join_sanity(
3772 if (msg_len <
sizeof(
struct memb_join)) {
3774 "Received memb_join message is too short... ignoring.");
3790 "Received memb_join message is too short... ignoring.");
3798static int check_memb_commit_token_sanity(
3810 "Received memb_commit_token message is too short... ignoring.");
3824 "Received memb_commit_token message is too short... ignoring.");
3832static int check_token_hold_cancel_sanity(
3841 "Received token_hold_cancel message is too short... ignoring.");
3859static int message_handler_orf_token (
3886 if (check_orf_token_sanity(instance, msg, msg_len,
sizeof(
token_storage),
3894#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3901 orf_token_endian_convert ((
struct orf_token *)msg,
3920 start_merge_detect_timeout (instance);
3923 cancel_merge_detect_timeout (instance);
3924 cancel_token_hold_retransmit_timeout (instance);
3930#ifdef TEST_RECOVERY_MSG_COUNT
3970 messages_free (instance, token->
aru);
3989 reset_heartbeat_timeout(instance);
3992 cancel_heartbeat_timeout(instance);
4034 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
4039 if (token->
aru == token->
seq) {
4060 "FAILED TO RECEIVE");
4064 memb_set_merge (&instance->
my_id, 1,
4091 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4104 "install seq %x aru %x high seq received %x",
4122 "retrans flag count %x token aru %x install seq %x aru %x %x",
4126 memb_state_operational_enter (instance);
4144 messages_deliver_to_app (instance, 0,
4152 reset_token_timeout (instance);
4153 reset_token_retransmit_timeout (instance);
4157 start_token_hold_retransmit_timeout (instance);
4167 reset_heartbeat_timeout(instance);
4170 cancel_heartbeat_timeout(instance);
4176static void messages_deliver_to_app (
4186 unsigned int range = 0;
4266 "Delivering MCAST message with seq %x to pending delivery queue",
4283static int message_handler_mcast (
4312#ifdef TEST_DROP_MCAST_PERCENTAGE
4335 if (!memb_set_subset (
4394 update_aru (instance);
4403static int message_handler_memb_merge_detect (
4445 if (!memb_set_subset (
4469static void memb_join_process (
4475 int gather_entered = 0;
4534 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4540 memb_state_commit_token_create (instance);
4542 memb_state_commit_enter (instance);
4545 if (memb_consensus_agreed (instance) &&
4546 memb_lowest_in_config (instance)) {
4548 memb_state_commit_token_create (instance);
4550 memb_state_commit_enter (instance);
4576 if (memb_set_subset (
4577 &instance->
my_id, 1,
4584 if (memb_set_subset (
4589 if (memb_set_subset (
4617 if (gather_entered == 0 &&
4634 out->header.type =
in->header.type;
4636 out->system_from = srp_addr_endian_convert(
in->system_from);
4637 out->proc_list_entries =
swab32 (
in->proc_list_entries);
4638 out->failed_list_entries =
swab32 (
in->failed_list_entries);
4646 for (
i = 0;
i <
out->proc_list_entries;
i++) {
4649 for (
i = 0;
i <
out->failed_list_entries;
i++) {
4664 out->header.type =
in->header.type;
4675 for (
i = 0;
i <
out->addr_entries;
i++) {
4699 out->header.type =
in->header.type;
4710 out->rtr_list_entries =
swab32 (
in->rtr_list_entries);
4711 for (
i = 0;
i <
out->rtr_list_entries;
i++) {
4712 out->rtr_list[
i].ring_id.rep =
swab32(
in->rtr_list[
i].ring_id.rep);
4713 out->rtr_list[
i].ring_id.seq =
swab64 (
in->rtr_list[
i].ring_id.seq);
4718static void mcast_endian_convert (
const struct mcast *
in,
struct mcast *
out)
4722 out->header.type =
in->header.type;
4724 out->header.encapsulated =
in->header.encapsulated;
4732 out->system_from = srp_addr_endian_convert(
in->system_from);
4735static void memb_merge_detect_endian_convert (
4741 out->header.type =
in->header.type;
4745 out->system_from = srp_addr_endian_convert (
in->system_from);
4748static int ignore_join_under_operational (
4762 if (memb_set_subset (&instance->
my_id, 1,
4780static int message_handler_memb_join (
4809 if (pause_flush (instance)) {
4818 if (!ignore_join_under_operational (instance,
memb_join)) {
4819 memb_join_process (instance,
memb_join);
4824 memb_join_process (instance,
memb_join);
4835 memb_join_process (instance,
memb_join);
4848 memb_join_process (instance,
memb_join);
4849 memb_recovery_state_token_loss (instance);
4857static int message_handler_memb_commit_token (
4871 "got commit token");
4885#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4900 if (memb_set_equal (
addr,
4907 memb_state_commit_enter (instance);
4936 "Sending initial ORF token");
4939 orf_token_send_initial (instance);
4940 reset_token_timeout (instance);
4941 reset_token_retransmit_timeout (instance);
4948static int message_handler_token_hold_cancel (
4965 timer_function_token_retransmit_timeout (instance);
4971static int check_message_header_validity(
4974 unsigned int msg_len,
4984 "Message received from %s is too short... Ignoring %u.",
5018 guessed_str =
"unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
5029 guessed_str =
"encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
5033 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5042 "Message received from %s has unsupported version %u... Ignoring",
5056 unsigned int msg_len,
5062 if (check_message_header_validity(context, msg, msg_len,
system_from) == -1) {
5087 "Message received from %s has wrong type... ignoring %d.\n",
5107 unsigned short ip_port,
5153 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5269 timer_function_orf_token_timeout(context);
totem_configuration_type
The totem_configuration_type enum.
@ TOTEM_CONFIGURATION_REGULAR
@ TOTEM_CONFIGURATION_TRANSITIONAL
#define MESSAGE_QUEUE_MAX
unsigned char addr[TOTEMIP_ADDRLEN]
totem_callback_token_type
The totem_callback_token_type enum.
@ TOTEM_CALLBACK_TOKEN_SENT
@ TOTEM_CALLBACK_TOKEN_RECEIVED
#define PROCESSOR_COUNT_MAX
#define CS_PRI_RING_ID_SEQ
icmap_map_t icmap_get_global_map(void)
Return global icmap.
#define LOGSYS_LEVEL_DEBUG
struct memb_ring_id ring_id
struct totem_message_header header
struct srp_addr system_from
unsigned int received_flg
struct memb_ring_id ring_id
unsigned int high_delivered
struct totem_message_header header
unsigned char end_of_commit_token[0]
struct memb_ring_id ring_id
struct srp_addr system_from
struct totem_message_header header
unsigned char end_of_memb_join[0]
unsigned long long ring_seq
unsigned int failed_list_entries
unsigned int proc_list_entries
struct totem_message_header header
struct memb_ring_id ring_id
struct srp_addr system_from
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
struct rtr_item rtr_list[0]
struct totem_message_header header
struct memb_ring_id ring_id
struct memb_ring_id ring_id
int(* callback_fn)(enum totem_callback_token_type type, const void *)
enum totem_callback_token_type callback_type
struct totem_message_header header
struct memb_ring_id ring_id
unsigned int max_messages
unsigned int heartbeat_failures_allowed
unsigned int token_timeout
struct totem_logging_configuration totem_logging_configuration
unsigned int downcheck_timeout
unsigned int miss_count_const
struct totem_interface * interfaces
unsigned int cancel_token_hold_on_retransmit
unsigned int fail_to_recv_const
unsigned int merge_timeout
struct totem_interface * orig_interfaces
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
unsigned int token_retransmits_before_loss_const
unsigned int max_network_delay
unsigned int seqno_unchanged_const
unsigned int consensus_timeout
unsigned int send_join_timeout
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
unsigned int token_retransmit_timeout
unsigned int token_warning
unsigned int join_timeout
unsigned int token_hold_timeout
struct totem_ip_address boundto
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
struct totem_ip_address mcast_addr
The totem_ip_address struct.
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
void(*) in log_level_security)
struct totem_ip_address mcast_address
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
int consensus_list_entries
int my_merge_detect_timeout_outstanding
qb_loop_timer_handle timer_heartbeat_timeout
unsigned int my_token_seq
qb_loop_timer_handle memb_timer_state_gather_join_timeout
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
uint32_t threaded_mode_enabled
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
int my_leave_memb_entries
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
int my_failed_list_entries
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
unsigned int use_heartbeat
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
void(*) enum memb_stat memb_state)
qb_loop_timer_handle memb_timer_state_commit_timeout
struct cs_queue new_message_queue
int orf_token_retransmit_size
unsigned int my_high_seq_received
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
uint32_t orf_token_discard
struct qb_list_head token_callback_sent_listhead
unsigned int last_released
int totemsrp_log_level_notice
struct cs_queue new_message_queue_trans
int totemsrp_log_level_trace
char orf_token_retransmit[TOKEN_SIZE_MAX]
struct cs_queue retrans_message_queue
struct memb_ring_id my_ring_id
int totemsrp_log_level_error
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
unsigned int old_ring_state_high_seq_received
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
qb_loop_timer_handle timer_pause_timeout
unsigned int my_high_ring_delivered
qb_loop_timer_handle timer_orf_token_retransmit_timeout
struct totem_config * totem_config
int my_deliver_memb_entries
void(* totemsrp_service_ready_fn)(void)
int my_trans_memb_entries
uint32_t originated_orf_token
void * token_recv_event_handle
struct sq recovery_sort_queue
qb_loop_timer_handle timer_orf_token_timeout
qb_loop_timer_handle timer_merge_detect_timeout
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
void * token_sent_event_handle
unsigned int my_high_delivered
int totemsrp_log_level_security
int totemsrp_log_level_warning
struct memb_commit_token * commit_token
char commit_token_storage[40000]
struct memb_ring_id my_old_ring_id
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
qb_loop_t * totemsrp_poll_handle
unsigned int my_install_seq
qb_loop_timer_handle timer_orf_token_warning
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
struct qb_list_head token_callback_received_listhead
uint32_t waiting_trans_ack
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
struct sq regular_sort_queue
unsigned long long token_ring_id_seq
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
int totemsrp_log_level_debug
struct totem_ip_address my_addrs[INTERFACE_MAX]
uint32_t continuous_gather
uint64_t recovery_entered
uint64_t memb_commit_token_rx
uint64_t memb_commit_token_tx
uint64_t operational_token_lost
uint64_t operational_entered
uint64_t gather_token_lost
uint64_t commit_token_lost
uint64_t token_hold_cancel_tx
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
uint64_t recovery_token_lost
uint64_t memb_merge_detect_rx
uint64_t memb_merge_detect_tx
uint64_t token_hold_cancel_rx
uint64_t consensus_timeouts
#define swab64(x)
The swab64 macro.
#define swab16(x)
The swab16 macro.
#define swab32(x)
The swab32 macro.
cfg_message_crypto_reconfig_phase_t
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
int totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
const char * totemip_sa_print(const struct sockaddr *sa)
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, int(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), int(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
void * totemnet_buffer_alloc(void *net_context)
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
int totemnet_send_flush(void *net_context)
void totemnet_buffer_release(void *net_context, void *ptr)
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
int totemnet_finalize(void *net_context)
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
int totemnet_processor_count_set(void *net_context, int processor_count)
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
int totemnet_recv_flush(void *net_context)
int totemnet_iface_check(void *net_context)
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
int totemnet_recv_mcast_empty(void *net_context)
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
void totemnet_stats_clear(void *net_context)
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
#define SEQNO_START_TOKEN
int main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
unsigned long long ring_seq
#define RETRANSMIT_ENTRIES_MAX
#define log_printf(level, format, args...)
void totemsrp_force_gather(void *context)
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
void totemsrp_threaded_mode_enable(void *context)
struct rtr_item rtr_list[0]
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
@ MESSAGE_NOT_ENCAPSULATED
unsigned int failed_list_entries
struct message_handlers totemsrp_message_handlers
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
#define LEAVE_DUMMY_NODEID
#define QUEUE_RTR_ITEMS_SIZE_MAX
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
@ TOTEMSRP_GSFROM_GATHER_MISSING1
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
void totemsrp_stats_clear(void *context, int flags)
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
void totemsrp_finalize(void *srp_context)
struct memb_ring_id ring_id
void totemsrp_trans_ack(void *context)
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
unsigned int totemsrp_my_nodeid_get(void *srp_context)
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
unsigned int received_flg
struct message_item __attribute__
int main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
unsigned int high_delivered
struct srp_addr system_from
unsigned int proc_list_entries
const char * gather_state_from_desc[]
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
#define TOTEM_TOKEN_STATS_MAX