corosync 3.1.9
totemsrp.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2018 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * The first version of this code was based upon Yair Amir's PhD thesis:
38 * https://corosync.github.io/corosync/doc/Yair_phd.ps.gz (ch4,5).
39 *
40 * The current version of totemsrp implements the Totem protocol specified in:
41 * https://corosync.github.io/corosync/doc/tocssrp95.ps.gz
42 *
43 * The deviations from the above published protocols are:
44 * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45 * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46 */
47
48#include <config.h>
49
50#include <assert.h>
51#ifdef HAVE_ALLOCA_H
52#include <alloca.h>
53#endif
54#include <sys/mman.h>
55#include <sys/types.h>
56#include <sys/stat.h>
57#include <sys/socket.h>
58#include <netdb.h>
59#include <sys/un.h>
60#include <sys/ioctl.h>
61#include <sys/param.h>
62#include <netinet/in.h>
63#include <arpa/inet.h>
64#include <unistd.h>
65#include <fcntl.h>
66#include <stdlib.h>
67#include <stdio.h>
68#include <errno.h>
69#include <sched.h>
70#include <time.h>
71#include <sys/time.h>
72#include <sys/poll.h>
73#include <sys/uio.h>
74#include <limits.h>
75
76#include <qb/qblist.h>
77#include <qb/qbdefs.h>
78#include <qb/qbutil.h>
79#include <qb/qbloop.h>
80
81#include <corosync/swab.h>
82#include <corosync/sq.h>
83
84#define LOGSYS_UTILS_ONLY 1
85#include <corosync/logsys.h>
86
87#include "totemsrp.h"
88#include "totemnet.h"
89
90#include "icmap.h"
91#include "totemconfig.h"
92
93#include "cs_queue.h"
94
95#define LOCALHOST_IP inet_addr("127.0.0.1")
96#define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99#define MAXIOVS 5
100#define RETRANSMIT_ENTRIES_MAX 30
101#define TOKEN_SIZE_MAX 64000 /* bytes */
102#define LEAVE_DUMMY_NODEID 0
103
104/*
105 * SRP address.
106 */
107struct srp_addr {
108 unsigned int nodeid;
109};
110
111/*
112 * Rollover handling:
113 * SEQNO_START_MSG is the starting sequence number after a new configuration
114 * This should remain zero, unless testing overflow in which case
115 * 0x7ffff000 and 0xfffff000 are good starting values.
116 *
117 * SEQNO_START_TOKEN is the starting sequence number after a new configuration
118 * for a token. This should remain zero, unless testing overflow in which
119 * case 07fffff00 or 0xffffff00 are good starting values.
120 */
121#define SEQNO_START_MSG 0x0
122#define SEQNO_START_TOKEN 0x0
123
124/*
125 * These can be used ot test different rollover points
126 * #define SEQNO_START_MSG 0xfffffe00
127 * #define SEQNO_START_TOKEN 0xfffffe00
128 */
129
130/*
131 * These can be used to test the error recovery algorithms
132 * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
133 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
134 * #define TEST_DROP_MCAST_PERCENTAGE 50
135 * #define TEST_RECOVERY_MSG_COUNT 300
136 */
137
138/*
139 * we compare incoming messages to determine if their endian is
140 * different - if so convert them
141 *
142 * do not change
143 */
144#define ENDIAN_LOCAL 0xff22
145
147 MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
148 MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
149 MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
150 MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
151 MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
152 MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
153};
154
159
160/*
161 * New membership algorithm local variables
162 */
165 int set;
166};
167
168
176
177
179 int mcast;
180 int token;
181};
182
192
193
194struct rtr_item {
196 unsigned int seq;
198
199
200struct orf_token {
202 unsigned int seq;
203 unsigned int token_seq;
204 unsigned int aru;
205 unsigned int aru_addr;
207 unsigned int backlog;
208 unsigned int fcc;
213
214
215struct memb_join {
218 unsigned int proc_list_entries;
220 unsigned long long ring_seq;
221 unsigned char end_of_memb_join[0];
222/*
223 * These parts of the data structure are dynamic:
224 * struct srp_addr proc_list[];
225 * struct srp_addr failed_list[];
226 */
228
229
235
236
241
242
249
250
253 unsigned int token_seq;
255 unsigned int retrans_flg;
258 unsigned char end_of_commit_token[0];
259/*
260 * These parts of the data structure are dynamic:
261 *
262 * struct srp_addr addr[PROCESSOR_COUNT_MAX];
263 * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
264 */
266
268 struct mcast *mcast;
269 unsigned int msg_len;
271
273 struct mcast *mcast;
274 unsigned int msg_len;
275};
276
283
286
288
289 /*
290 * Flow control mcasts and remcasts on last and current orf_token
291 */
293
295
297
299
301
303
305
307
309
311
313
315
317
319
321
323
325
327
329
331
333
335
337
339
341
343
345
347
348 unsigned int my_last_aru;
349
351
353
355
356 unsigned int my_install_seq;
357
359
361
363
365
367
368 /*
369 * Queues used to order, deliver, and recover messages
370 */
372
374
376
378
380
381 /*
382 * Received up to and including
383 */
384 unsigned int my_aru;
385
386 unsigned int my_high_delivered;
387
389
391
393
395
396 unsigned int my_token_seq;
397
398 /*
399 * Timers
400 */
402
404
406
408
410
412
414
416
418
420
421 /*
422 * Function and data used to log messages
423 */
425
427
429
431
433
435
437
439 int level,
440 int subsys,
441 const char *function,
442 const char *file,
443 int line,
444 const char *format, ...)__attribute__((format(printf, 6, 7)));;
445
447
448//TODO struct srp_addr next_memb;
449
451
453
455 unsigned int nodeid,
456 const void *msg,
457 unsigned int msg_len,
459
462 const unsigned int *member_list, size_t member_list_entries,
463 const unsigned int *left_list, size_t left_list_entries,
464 const unsigned int *joined_list, size_t joined_list_entries,
465 const struct memb_ring_id *ring_id);
466
468
471
474 unsigned int nodeid);
475
477 const struct memb_ring_id *memb_ring_id,
478 unsigned int nodeid);
479
481
483
484 unsigned long long token_ring_id_seq;
485
486 unsigned int last_released;
487
488 unsigned int set_aru;
489
491
493
495
496 unsigned int my_last_seq;
497
499
501
503
504 unsigned int use_heartbeat;
505
506 unsigned int my_trc;
507
508 unsigned int my_pbl;
509
510 unsigned int my_cbl;
511
513
515
517
519
521
523
525
527
531};
532
534 int count;
536 struct totemsrp_instance *instance,
537 const void *msg,
538 size_t msg_len,
540};
541
561
562const char* gather_state_from_desc [] = {
563 [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
565 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
566 [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
567 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
568 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
569 [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
570 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
571 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
572 [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
573 [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
574 [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
575 [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
576 [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
577 [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
578 [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
579};
580
581/*
582 * forward decls
583 */
584static int message_handler_orf_token (
585 struct totemsrp_instance *instance,
586 const void *msg,
587 size_t msg_len,
589
590static int message_handler_mcast (
591 struct totemsrp_instance *instance,
592 const void *msg,
593 size_t msg_len,
595
596static int message_handler_memb_merge_detect (
597 struct totemsrp_instance *instance,
598 const void *msg,
599 size_t msg_len,
601
602static int message_handler_memb_join (
603 struct totemsrp_instance *instance,
604 const void *msg,
605 size_t msg_len,
607
608static int message_handler_memb_commit_token (
609 struct totemsrp_instance *instance,
610 const void *msg,
611 size_t msg_len,
613
614static int message_handler_token_hold_cancel (
615 struct totemsrp_instance *instance,
616 const void *msg,
617 size_t msg_len,
619
620static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
621
622static void srp_addr_to_nodeid (
623 struct totemsrp_instance *instance,
624 unsigned int *nodeid_out,
625 struct srp_addr *srp_addr_in,
626 unsigned int entries);
627
628static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
629
630static void memb_leave_message_send (struct totemsrp_instance *instance);
631
632static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
633static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
634static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
635static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
637static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
638
639static void memb_ring_id_set (struct totemsrp_instance *instance,
640 const struct memb_ring_id *ring_id);
641static void target_set_completed (void *context);
642static void memb_state_commit_token_update (struct totemsrp_instance *instance);
643static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
644static int memb_state_commit_token_send (struct totemsrp_instance *instance);
645static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
646static void memb_state_commit_token_create (struct totemsrp_instance *instance);
647static int token_hold_cancel_send (struct totemsrp_instance *instance);
648static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
649static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
650static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
651static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
652static void memb_merge_detect_endian_convert (
653 const struct memb_merge_detect *in,
654 struct memb_merge_detect *out);
655static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
656static void timer_function_orf_token_timeout (void *data);
657static void timer_function_orf_token_warning (void *data);
658static void timer_function_pause_timeout (void *data);
659static void timer_function_heartbeat_timeout (void *data);
660static void timer_function_token_retransmit_timeout (void *data);
661static void timer_function_token_hold_retransmit_timeout (void *data);
662static void timer_function_merge_detect_timeout (void *data);
663static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
664static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
665static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
666
667int main_deliver_fn (
668 void *context,
669 const void *msg,
670 unsigned int msg_len,
671 const struct sockaddr_storage *system_from);
672
674 void *context,
675 const struct totem_ip_address *iface_address,
676 unsigned int iface_no);
677
679 6,
680 {
681 message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
682 message_handler_mcast, /* MESSAGE_TYPE_MCAST */
683 message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
684 message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
685 message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
686 message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
687 }
688};
689
690#define log_printf(level, format, args...) \
691do { \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
695 format, ##args); \
696} while (0);
697#define LOGSYS_PERROR(err_num, level, fmt, args...) \
698do { \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
705 } while(0)
706
707static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
708{
711 }
712 else {
713 return "UNKNOWN";
714 }
715}
716
717static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
718{
719 memset (instance, 0, sizeof (struct totemsrp_instance));
720
722
724
725 instance->my_received_flg = 1;
726
727 instance->my_token_seq = SEQNO_START_TOKEN - 1;
728
730
731 instance->set_aru = -1;
732
733 instance->my_aru = SEQNO_START_MSG;
734
736
738
739 instance->orf_token_discard = 0;
740
741 instance->originated_orf_token = 0;
742
743 instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
744
745 instance->waiting_trans_ack = 1;
746}
747
748static int pause_flush (struct totemsrp_instance *instance)
749{
752 int res = 0;
753
756
757 if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
759 "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
760 /*
761 * -1 indicates an error from recvmsg
762 */
763 do {
765 } while (res == -1);
766 }
767 return (res);
768}
769
770static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
771{
772 struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
774
776
778 /* incr latest token the index */
779 if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
780 instance->stats.latest_token = 0;
781 else
782 instance->stats.latest_token++;
783
784 if (instance->stats.earliest_token == instance->stats.latest_token) {
785 /* we have filled up the array, start overwriting */
786 if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
787 instance->stats.earliest_token = 0;
788 else
789 instance->stats.earliest_token++;
790
791 instance->stats.token[instance->stats.earliest_token].rx = 0;
792 instance->stats.token[instance->stats.earliest_token].tx = 0;
793 instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
794 }
795
796 instance->stats.token[instance->stats.latest_token].rx = time_now;
797 instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
798 } else {
799 instance->stats.token[instance->stats.latest_token].tx = time_now;
800 }
801 return 0;
802}
803
804static void totempg_mtu_changed(void *context, int net_mtu)
805{
806 struct totemsrp_instance *instance = context;
807
808 instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
809
811 "Net MTU changed to %d, new value is %d",
812 net_mtu, instance->totem_config->net_mtu);
813}
814
815/*
816 * Exported interfaces
817 */
819 qb_loop_t *poll_handle,
820 void **srp_context,
822 totempg_stats_t *stats,
823
824 void (*deliver_fn) (
825 unsigned int nodeid,
826 const void *msg,
827 unsigned int msg_len,
829
830 void (*confchg_fn) (
832 const unsigned int *member_list, size_t member_list_entries,
833 const unsigned int *left_list, size_t left_list_entries,
834 const unsigned int *joined_list, size_t joined_list_entries,
835 const struct memb_ring_id *ring_id),
837 int waiting_trans_ack))
838{
839 struct totemsrp_instance *instance;
840 int res;
841
842 instance = malloc (sizeof (struct totemsrp_instance));
843 if (instance == NULL) {
844 goto error_exit;
845 }
846
847 totemsrp_instance_initialize (instance);
848
851
852 stats->srp = &instance->stats;
853 instance->stats.latest_token = 0;
854 instance->stats.earliest_token = 0;
855
856 instance->totem_config = totem_config;
857
858 /*
859 * Configure logging
860 */
869
870 /*
871 * Configure totem store and load functions
872 */
875
876 /*
877 * Initialize local variables for totemsrp
878 */
880
881 /*
882 * Display totem configuration
883 */
885 "Token Timeout (%d ms) retransmit timeout (%d ms)",
890 "Token warning every %d ms (%d%% of Token Timeout)",
892 if (token_warning_ms < totem_config->token_retransmit_timeout)
894 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
897 } else {
899 "Token warnings disabled");
900 }
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
909
912 "downcheck (%d ms) fail to recv const (%d msgs)",
915 "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
920
922 "missed count const (%d messages)",
924
926 "send threads (%d threads)", totem_config->threads);
927
929 "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
931 "max_network_delay (%d ms)", totem_config->max_network_delay);
932
933
934 cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
935 sizeof (struct message_item), instance->threaded_mode_enabled);
936
937 sq_init (&instance->regular_sort_queue,
939
940 sq_init (&instance->recovery_sort_queue,
942
943 instance->totemsrp_poll_handle = poll_handle;
944
945 instance->totemsrp_deliver_fn = deliver_fn;
946
947 instance->totemsrp_confchg_fn = confchg_fn;
948 instance->use_heartbeat = 1;
949
950 timer_function_pause_timeout (instance);
951
954 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
955 instance->use_heartbeat = 0;
956 }
957
958 if (instance->use_heartbeat) {
959 instance->heartbeat_timeout
962
963 if (instance->heartbeat_timeout >= totem_config->token_timeout) {
965 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
966 instance->heartbeat_timeout,
969 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
972 instance->use_heartbeat = 0;
973 }
974 else {
976 "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
977 }
978 }
979
981 poll_handle,
982 &instance->totemnet_context,
984 stats->srp,
985 instance,
988 totempg_mtu_changed,
989 target_set_completed);
990 if (res == -1) {
991 goto error_exit;
992 }
993
994 instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
995
996 /*
997 * Must have net_mtu adjusted by totemnet_initialize first
998 */
999 cs_queue_init (&instance->new_message_queue,
1001 sizeof (struct message_item), instance->threaded_mode_enabled);
1002
1003 cs_queue_init (&instance->new_message_queue_trans,
1005 sizeof (struct message_item), instance->threaded_mode_enabled);
1006
1008 &instance->token_recv_event_handle,
1010 0,
1011 token_event_stats_collector,
1012 instance);
1014 &instance->token_sent_event_handle,
1016 0,
1017 token_event_stats_collector,
1018 instance);
1019 *srp_context = instance;
1020 return (0);
1021
1023 return (-1);
1024}
1025
1027 void *srp_context)
1028{
1029 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1030
1031 memb_leave_message_send (instance);
1033 cs_queue_free (&instance->new_message_queue);
1034 cs_queue_free (&instance->new_message_queue_trans);
1035 cs_queue_free (&instance->retrans_message_queue);
1036 sq_free (&instance->regular_sort_queue);
1037 sq_free (&instance->recovery_sort_queue);
1038 free (instance);
1039}
1040
1042 void *srp_context,
1043 unsigned int nodeid,
1045{
1046 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1047 int i;
1048
1050
1051 /* Fill in 'reachable' here as the lower level UDP[u] layers don't know */
1052 for (i = 0; i < instance->my_proc_list_entries; i++) {
1053 if (instance->my_proc_list[i].nodeid == nodeid) {
1054 node_status->reachable = 1;
1055 }
1056 }
1057
1059}
1060
1061
1062/*
1063 * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1064 * with interaces_size number of items. iface_count is final number of interfaces filled by this
1065 * function.
1066 *
1067 * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1068 * and if interface was not found, -1 is returned.
1069 */
1071 void *srp_context,
1072 unsigned int nodeid,
1073 unsigned int *interface_id,
1074 struct totem_ip_address *interfaces,
1075 unsigned int interfaces_size,
1076 char ***status,
1077 unsigned int *iface_count)
1078{
1079 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1080 struct totem_ip_address *iface_ptr = interfaces;
1081 int res = 0;
1082 int i,n;
1083 int num_ifs = 0;
1084
1085 memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1087
1088 for (i=0; i<INTERFACE_MAX; i++) {
1089 for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1090 if (instance->totem_config->interfaces[i].configured &&
1092 memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1094 iface_ptr++;
1095 if (++num_ifs > interfaces_size) {
1096 res = -2;
1097 break;
1098 }
1099 }
1100 }
1101 }
1102
1105 return (res);
1106}
1107
1109 void *srp_context,
1110 const char *cipher_type,
1111 const char *hash_type)
1112{
1113 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1114 int res;
1115
1117
1118 return (res);
1119}
1120
1121
1123 void *srp_context)
1124{
1125 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1126 unsigned int res;
1127
1128 res = instance->my_id.nodeid;
1129
1130 return (res);
1131}
1132
1134 void *srp_context)
1135{
1136 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1137 int res;
1138
1139 res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1140
1141 return (res);
1142}
1143
1144
1145/*
1146 * Set operations for use by the membership algorithm
1147 */
1148static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1149{
1150 if (a->nodeid == b->nodeid) {
1151 return 1;
1152 }
1153 return 0;
1154}
1155
1156static void srp_addr_to_nodeid (
1157 struct totemsrp_instance *instance,
1158 unsigned int *nodeid_out,
1159 struct srp_addr *srp_addr_in,
1160 unsigned int entries)
1161{
1162 unsigned int i;
1163
1164 for (i = 0; i < entries; i++) {
1165 nodeid_out[i] = srp_addr_in[i].nodeid;
1166 }
1167}
1168
1169static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1170{
1171 struct srp_addr res;
1172
1173 res.nodeid = swab32 (in.nodeid);
1174
1175 return (res);
1176}
1177
1178static void memb_consensus_reset (struct totemsrp_instance *instance)
1179{
1180 instance->consensus_list_entries = 0;
1181}
1182
1183static void memb_set_subtract (
1184 struct srp_addr *out_list, int *out_list_entries,
1185 struct srp_addr *one_list, int one_list_entries,
1186 struct srp_addr *two_list, int two_list_entries)
1187{
1188 int found = 0;
1189 int i;
1190 int j;
1191
1192 *out_list_entries = 0;
1193
1194 for (i = 0; i < one_list_entries; i++) {
1195 for (j = 0; j < two_list_entries; j++) {
1196 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1197 found = 1;
1198 break;
1199 }
1200 }
1201 if (found == 0) {
1204 }
1205 found = 0;
1206 }
1207}
1208
1209/*
1210 * Set consensus for a specific processor
1211 */
1212static void memb_consensus_set (
1213 struct totemsrp_instance *instance,
1214 const struct srp_addr *addr)
1215{
1216 int found = 0;
1217 int i;
1218
1219 for (i = 0; i < instance->consensus_list_entries; i++) {
1220 if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1221 found = 1;
1222 break; /* found entry */
1223 }
1224 }
1225 instance->consensus_list[i].addr = *addr;
1226 instance->consensus_list[i].set = 1;
1227 if (found == 0) {
1228 instance->consensus_list_entries++;
1229 }
1230 return;
1231}
1232
1233/*
1234 * Is consensus set for a specific processor
1235 */
1236static int memb_consensus_isset (
1237 struct totemsrp_instance *instance,
1238 const struct srp_addr *addr)
1239{
1240 int i;
1241
1242 for (i = 0; i < instance->consensus_list_entries; i++) {
1243 if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1244 return (instance->consensus_list[i].set);
1245 }
1246 }
1247 return (0);
1248}
1249
1250/*
1251 * Is consensus agreed upon based upon consensus database
1252 */
1253static int memb_consensus_agreed (
1254 struct totemsrp_instance *instance)
1255{
1257 int token_memb_entries = 0;
1258 int agreed = 1;
1259 int i;
1260
1261 memb_set_subtract (token_memb, &token_memb_entries,
1262 instance->my_proc_list, instance->my_proc_list_entries,
1263 instance->my_failed_list, instance->my_failed_list_entries);
1264
1265 for (i = 0; i < token_memb_entries; i++) {
1266 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1267 agreed = 0;
1268 break;
1269 }
1270 }
1271
1272 if (agreed && instance->failed_to_recv == 1) {
1273 /*
1274 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1275 * will create single ring anyway.
1276 */
1277
1278 return (agreed);
1279 }
1280
1282
1283 return (agreed);
1284}
1285
1286static void memb_consensus_notset (
1287 struct totemsrp_instance *instance,
1290 struct srp_addr *comparison_list,
1292{
1293 int i;
1294
1296
1297 for (i = 0; i < instance->my_proc_list_entries; i++) {
1298 if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1301 }
1302 }
1303}
1304
1305/*
1306 * Is set1 equal to set2 Entries can be in different orders
1307 */
1308static int memb_set_equal (
1309 struct srp_addr *set1, int set1_entries,
1310 struct srp_addr *set2, int set2_entries)
1311{
1312 int i;
1313 int j;
1314
1315 int found = 0;
1316
1317 if (set1_entries != set2_entries) {
1318 return (0);
1319 }
1320 for (i = 0; i < set2_entries; i++) {
1321 for (j = 0; j < set1_entries; j++) {
1322 if (srp_addr_equal (&set1[j], &set2[i])) {
1323 found = 1;
1324 break;
1325 }
1326 }
1327 if (found == 0) {
1328 return (0);
1329 }
1330 found = 0;
1331 }
1332 return (1);
1333}
1334
1335/*
1336 * Is subset fully contained in fullset
1337 */
1338static int memb_set_subset (
1339 const struct srp_addr *subset, int subset_entries,
1340 const struct srp_addr *fullset, int fullset_entries)
1341{
1342 int i;
1343 int j;
1344 int found = 0;
1345
1347 return (0);
1348 }
1349 for (i = 0; i < subset_entries; i++) {
1350 for (j = 0; j < fullset_entries; j++) {
1351 if (srp_addr_equal (&subset[i], &fullset[j])) {
1352 found = 1;
1353 }
1354 }
1355 if (found == 0) {
1356 return (0);
1357 }
1358 found = 0;
1359 }
1360 return (1);
1361}
1362/*
1363 * merge subset into fullset taking care not to add duplicates
1364 */
1365static void memb_set_merge (
1366 const struct srp_addr *subset, int subset_entries,
1367 struct srp_addr *fullset, int *fullset_entries)
1368{
1369 int found = 0;
1370 int i;
1371 int j;
1372
1373 for (i = 0; i < subset_entries; i++) {
1374 for (j = 0; j < *fullset_entries; j++) {
1375 if (srp_addr_equal (&fullset[j], &subset[i])) {
1376 found = 1;
1377 break;
1378 }
1379 }
1380 if (found == 0) {
1383 }
1384 found = 0;
1385 }
1386 return;
1387}
1388
1389static void memb_set_and_with_ring_id (
1390 struct srp_addr *set1,
1392 int set1_entries,
1393 struct srp_addr *set2,
1394 int set2_entries,
1395 struct memb_ring_id *old_ring_id,
1396 struct srp_addr *and,
1397 int *and_entries)
1398{
1399 int i;
1400 int j;
1401 int found = 0;
1402
1403 *and_entries = 0;
1404
1405 for (i = 0; i < set2_entries; i++) {
1406 for (j = 0; j < set1_entries; j++) {
1407 if (srp_addr_equal (&set1[j], &set2[i])) {
1408 if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1409 found = 1;
1410 }
1411 break;
1412 }
1413 }
1414 if (found) {
1415 and[*and_entries] = set1[j];
1416 *and_entries = *and_entries + 1;
1417 }
1418 found = 0;
1419 }
1420 return;
1421}
1422
1423static void memb_set_log(
1424 struct totemsrp_instance *instance,
1425 int level,
1426 const char *string,
1427 struct srp_addr *list,
1428 int list_entries)
1429{
1430 char int_buf[32];
1431 char list_str[512];
1432 int i;
1433
1434 memset(list_str, 0, sizeof(list_str));
1435
1436 for (i = 0; i < list_entries; i++) {
1437 if (i == 0) {
1438 snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1439 } else {
1440 snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1441 }
1442
1443 if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1444 break ;
1445 }
1447 }
1448
1449 log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1450}
1451
1452static void my_leave_memb_clear(
1453 struct totemsrp_instance *instance)
1454{
1455 memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1456 instance->my_leave_memb_entries = 0;
1457}
1458
1459static unsigned int my_leave_memb_match(
1460 struct totemsrp_instance *instance,
1461 unsigned int nodeid)
1462{
1463 int i;
1464 unsigned int ret = 0;
1465
1466 for (i = 0; i < instance->my_leave_memb_entries; i++){
1467 if (instance->my_leave_memb_list[i] == nodeid){
1468 ret = nodeid;
1469 break;
1470 }
1471 }
1472 return ret;
1473}
1474
1475static void my_leave_memb_set(
1476 struct totemsrp_instance *instance,
1477 unsigned int nodeid)
1478{
1479 int i, found = 0;
1480 for (i = 0; i < instance->my_leave_memb_entries; i++){
1481 if (instance->my_leave_memb_list[i] == nodeid){
1482 found = 1;
1483 break;
1484 }
1485 }
1486 if (found == 1) {
1487 return;
1488 }
1489 if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1490 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1491 instance->my_leave_memb_entries++;
1492 } else {
1494 "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1495 }
1496}
1497
1498
1499static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1500{
1501 assert (instance != NULL);
1502 return totemnet_buffer_alloc (instance->totemnet_context);
1503}
1504
1505static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1506{
1507 assert (instance != NULL);
1509}
1510
1511static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1512{
1513 int32_t res;
1514
1520 (void *)instance,
1521 timer_function_token_retransmit_timeout,
1522 &instance->timer_orf_token_retransmit_timeout);
1523 if (res != 0) {
1524 log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1525 }
1526
1527}
1528
1529static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1530{
1531 int32_t res;
1532
1533 if (instance->my_merge_detect_timeout_outstanding == 0) {
1537 (void *)instance,
1538 timer_function_merge_detect_timeout,
1539 &instance->timer_merge_detect_timeout);
1540 if (res != 0) {
1541 log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1542 }
1543
1545 }
1546}
1547
1548static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1549{
1552}
1553
1554/*
1555 * ring_state_* is used to save and restore the sort queue
1556 * state when a recovery operation fails (and enters gather)
1557 */
1558static void old_ring_state_save (struct totemsrp_instance *instance)
1559{
1560 if (instance->old_ring_state_saved == 0) {
1561 instance->old_ring_state_saved = 1;
1562 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1563 sizeof (struct memb_ring_id));
1564 instance->old_ring_state_aru = instance->my_aru;
1567 "Saving state aru %x high seq received %x",
1568 instance->my_aru, instance->my_high_seq_received);
1569 }
1570}
1571
1572static void old_ring_state_restore (struct totemsrp_instance *instance)
1573{
1574 instance->my_aru = instance->old_ring_state_aru;
1577 "Restoring instance->my_aru %x my high seq received %x",
1578 instance->my_aru, instance->my_high_seq_received);
1579}
1580
1581static void old_ring_state_reset (struct totemsrp_instance *instance)
1582{
1584 "Resetting old ring state");
1585 instance->old_ring_state_saved = 0;
1586}
1587
1588static void reset_pause_timeout (struct totemsrp_instance *instance)
1589{
1590 int32_t res;
1591
1596 (void *)instance,
1597 timer_function_pause_timeout,
1598 &instance->timer_pause_timeout);
1599 if (res != 0) {
1600 log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1601 }
1602}
1603
1604static void reset_token_warning (struct totemsrp_instance *instance) {
1605 int32_t res;
1606
1611 (void *)instance,
1612 timer_function_orf_token_warning,
1613 &instance->timer_orf_token_warning);
1614 if (res != 0) {
1615 log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1616 }
1617}
1618
1619static void reset_token_timeout (struct totemsrp_instance *instance) {
1620 int32_t res;
1621
1626 (void *)instance,
1627 timer_function_orf_token_timeout,
1628 &instance->timer_orf_token_timeout);
1629 if (res != 0) {
1630 log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1631 }
1632
1633 if (instance->totem_config->token_warning)
1634 reset_token_warning(instance);
1635}
1636
1637static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1638 int32_t res;
1639
1644 (void *)instance,
1645 timer_function_heartbeat_timeout,
1646 &instance->timer_heartbeat_timeout);
1647 if (res != 0) {
1648 log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1649 }
1650}
1651
1652
1653static void cancel_token_warning (struct totemsrp_instance *instance) {
1655}
1656
1657static void cancel_token_timeout (struct totemsrp_instance *instance) {
1659
1660 if (instance->totem_config->token_warning)
1661 cancel_token_warning(instance);
1662}
1663
1664static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1666}
1667
1668static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1669{
1671}
1672
1673static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1674{
1675 int32_t res;
1676
1680 (void *)instance,
1681 timer_function_token_hold_retransmit_timeout,
1682 &instance->timer_orf_token_hold_retransmit_timeout);
1683 if (res != 0) {
1684 log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1685 }
1686}
1687
1688static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1689{
1692}
1693
1694static void memb_state_consensus_timeout_expired (
1695 struct totemsrp_instance *instance)
1696{
1699
1700 instance->stats.consensus_timeouts++;
1701 if (memb_consensus_agreed (instance)) {
1702 memb_consensus_reset (instance);
1703
1704 memb_consensus_set (instance, &instance->my_id);
1705
1706 reset_token_timeout (instance); // REVIEWED
1707 } else {
1708 memb_consensus_notset (
1709 instance,
1712 instance->my_proc_list,
1713 instance->my_proc_list_entries);
1714
1716 instance->my_failed_list, &instance->my_failed_list_entries);
1717 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1718 }
1719}
1720
1721static void memb_join_message_send (struct totemsrp_instance *instance);
1722
1723static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1724
1725/*
1726 * Timers used for various states of the membership algorithm
1727 */
1728static void timer_function_pause_timeout (void *data)
1729{
1730 struct totemsrp_instance *instance = data;
1731
1733 reset_pause_timeout (instance);
1734}
1735
1736static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1737{
1738 old_ring_state_restore (instance);
1739 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1740 instance->stats.recovery_token_lost++;
1741}
1742
1743static void timer_function_orf_token_warning (void *data)
1744{
1745 struct totemsrp_instance *instance = data;
1747
1748 /* need to protect against the case where token_warning is set to 0 dynamically */
1749 if (instance->totem_config->token_warning) {
1751 instance->stats.token[instance->stats.latest_token].rx;
1753 "Token has not been received in %"PRIu64" ms", tv_diff);
1754 reset_token_warning(instance);
1755 } else {
1756 cancel_token_warning(instance);
1757 }
1758}
1759
1760static void timer_function_orf_token_timeout (void *data)
1761{
1762 struct totemsrp_instance *instance = data;
1763
1764 switch (instance->memb_state) {
1767 "The token was lost in the OPERATIONAL state.");
1769 "A processor failed, forming new configuration:"
1770 " token timed out (%ums), waiting %ums for consensus.",
1771 instance->totem_config->token_timeout,
1772 instance->totem_config->consensus_timeout);
1774 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1775 instance->stats.operational_token_lost++;
1776 break;
1777
1778 case MEMB_STATE_GATHER:
1780 "The consensus timeout expired (%ums).",
1781 instance->totem_config->consensus_timeout);
1782 memb_state_consensus_timeout_expired (instance);
1783 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1784 instance->stats.gather_token_lost++;
1785 break;
1786
1787 case MEMB_STATE_COMMIT:
1789 "The token was lost in the COMMIT state.");
1790 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1791 instance->stats.commit_token_lost++;
1792 break;
1793
1796 "The token was lost in the RECOVERY state.");
1797 memb_recovery_state_token_loss (instance);
1798 instance->orf_token_discard = 1;
1799 break;
1800 }
1801}
1802
1803static void timer_function_heartbeat_timeout (void *data)
1804{
1805 struct totemsrp_instance *instance = data;
1807 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1808 timer_function_orf_token_timeout(data);
1809}
1810
1811static void memb_timer_function_state_gather (void *data)
1812{
1813 struct totemsrp_instance *instance = data;
1814 int32_t res;
1815
1816 switch (instance->memb_state) {
1819 assert (0); /* this should never happen */
1820 break;
1821 case MEMB_STATE_GATHER:
1822 case MEMB_STATE_COMMIT:
1823 memb_join_message_send (instance);
1824
1825 /*
1826 * Restart the join timeout
1827 `*/
1829
1833 (void *)instance,
1834 memb_timer_function_state_gather,
1835 &instance->memb_timer_state_gather_join_timeout);
1836
1837 if (res != 0) {
1838 log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1839 }
1840 break;
1841 }
1842}
1843
1844static void memb_timer_function_gather_consensus_timeout (void *data)
1845{
1846 struct totemsrp_instance *instance = data;
1847 memb_state_consensus_timeout_expired (instance);
1848}
1849
1850static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1851{
1852 unsigned int i;
1855 unsigned int range = 0;
1856 int res;
1857 void *ptr;
1858 struct mcast *mcast;
1859
1861 "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1862
1863 range = instance->my_aru - SEQNO_START_MSG;
1864 /*
1865 * Move messages from recovery to regular sort queue
1866 */
1867// todo should i be initialized to 0 or 1 ?
1868 for (i = 1; i <= range; i++) {
1869 res = sq_item_get (&instance->recovery_sort_queue,
1870 i + SEQNO_START_MSG, &ptr);
1871 if (res != 0) {
1872 continue;
1873 }
1875
1876 /*
1877 * Convert recovery message into regular message
1878 */
1881 /*
1882 * Message is a recovery message encapsulated
1883 * in a new ring message
1884 */
1885 regular_message_item.mcast =
1886 (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1887 regular_message_item.msg_len =
1888 recovery_message_item->msg_len - sizeof (struct mcast);
1890 } else {
1891 /*
1892 * TODO this case shouldn't happen
1893 */
1894 continue;
1895 }
1896
1898 "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1899 (uint64_t)mcast->seq);
1900
1901 /*
1902 * Only add this message to the regular sort
1903 * queue if it was originated with the same ring
1904 * id as the previous ring
1905 */
1906 if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1907 sizeof (struct memb_ring_id)) == 0) {
1908
1909 res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1910 if (res == 0) {
1911 sq_item_add (&instance->regular_sort_queue,
1913 if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1915 }
1916 }
1917 } else {
1919 "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1920 }
1921 }
1922}
1923
1924/*
1925 * Change states in the state machine of the membership algorithm
1926 */
1927static void memb_state_operational_enter (struct totemsrp_instance *instance)
1928{
1930 int joined_list_entries = 0;
1931 unsigned int aru_save;
1935 unsigned int left_list[PROCESSOR_COUNT_MAX];
1936 unsigned int i;
1937 unsigned int res;
1938 char left_node_msg[1024];
1939 char joined_node_msg[1024];
1940 char failed_node_msg[1024];
1941
1942 instance->originated_orf_token = 0;
1943
1944 memb_consensus_reset (instance);
1945
1946 old_ring_state_reset (instance);
1947
1948 deliver_messages_from_recovery_to_regular (instance);
1949
1951 "Delivering to app %x to %x",
1952 instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1953
1954 aru_save = instance->my_aru;
1955 instance->my_aru = instance->old_ring_state_aru;
1956
1957 messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1958
1959 /*
1960 * Calculate joined and left list
1961 */
1962 memb_set_subtract (instance->my_left_memb_list,
1963 &instance->my_left_memb_entries,
1964 instance->my_memb_list, instance->my_memb_entries,
1965 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1966
1967 memb_set_subtract (joined_list, &joined_list_entries,
1968 instance->my_new_memb_list, instance->my_new_memb_entries,
1969 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1970
1971 /*
1972 * Install new membership
1973 */
1974 instance->my_memb_entries = instance->my_new_memb_entries;
1975 memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1976 sizeof (struct srp_addr) * instance->my_memb_entries);
1977 instance->last_released = 0;
1978 instance->my_set_retrans_flg = 0;
1979
1980 /*
1981 * Deliver transitional configuration to application
1982 */
1983 srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1984 instance->my_left_memb_entries);
1985 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1986 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1990 0, 0, &instance->my_ring_id);
1991 /*
1992 * Switch new totemsrp messages queue. Messages sent from now on are stored
1993 * in different queue so synchronization messages are delivered first. Totempg
1994 * buffers will be switched later.
1995 */
1996 instance->waiting_trans_ack = 1;
1997
1998// TODO we need to filter to ensure we only deliver those
1999// messages which are part of instance->my_deliver_memb
2000 messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
2001
2002 /*
2003 * Switch totempg buffers. This used to be right after
2004 * instance->waiting_trans_ack = 1;
2005 * line. This was causing problem, because there may be not yet
2006 * processed parts of messages in totempg buffers.
2007 * So when buffers were switched and recovered messages
2008 * got delivered it was not possible to assemble them.
2009 */
2011
2012 instance->my_aru = aru_save;
2013
2014 /*
2015 * Deliver regular configuration to application
2016 */
2017 srp_addr_to_nodeid (instance, new_memb_list_totemip,
2018 instance->my_new_memb_list, instance->my_new_memb_entries);
2019 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
2023 0, 0,
2025
2026 /*
2027 * The recovery sort queue now becomes the regular
2028 * sort queue. It is necessary to copy the state
2029 * into the regular sort queue.
2030 */
2031 sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2032 instance->my_last_aru = SEQNO_START_MSG;
2033
2034 /* When making my_proc_list smaller, ensure that the
2035 * now non-used entries are zero-ed out. There are some suspect
2036 * assert's that assume that there is always 2 entries in the list.
2037 * These fail when my_proc_list is reduced to 1 entry (and the
2038 * valid [0] entry is the same as the 'unused' [1] entry).
2039 */
2040 memset(instance->my_proc_list, 0,
2041 sizeof (struct srp_addr) * instance->my_proc_list_entries);
2042
2043 instance->my_proc_list_entries = instance->my_new_memb_entries;
2044 memcpy (instance->my_proc_list, instance->my_new_memb_list,
2045 sizeof (struct srp_addr) * instance->my_memb_entries);
2046
2047 instance->my_failed_list_entries = 0;
2048 /*
2049 * TODO Not exactly to spec
2050 *
2051 * At the entry to this function all messages without a gap are
2052 * deliered.
2053 *
2054 * This code throw away messages from the last gap in the sort queue
2055 * to my_high_seq_received
2056 *
2057 * What should really happen is we should deliver all messages up to
2058 * a gap, then delier the transitional configuration, then deliver
2059 * the messages between the first gap and my_high_seq_received, then
2060 * deliver a regular configuration, then deliver the regular
2061 * configuration
2062 *
2063 * Unfortunately totempg doesn't appear to like this operating mode
2064 * which needs more inspection
2065 */
2066 i = instance->my_high_seq_received + 1;
2067 do {
2068 void *ptr;
2069
2070 i -= 1;
2071 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2072 if (i == 0) {
2073 break;
2074 }
2075 } while (res);
2076
2077 instance->my_high_delivered = i;
2078
2079 for (i = 0; i <= instance->my_high_delivered; i++) {
2080 void *ptr;
2081
2082 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2083 if (res == 0) {
2085
2087 free (regular_message->mcast);
2088 }
2089 }
2090 sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2091 instance->last_released = instance->my_high_delivered;
2092
2093 if (joined_list_entries) {
2094 int sptr = 0;
2095 sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2096 for (i=0; i< joined_list_entries; i++) {
2098 }
2099 }
2100 else {
2101 joined_node_msg[0] = '\0';
2102 }
2103
2104 if (instance->my_left_memb_entries) {
2105 int sptr = 0;
2106 int sptr2 = 0;
2107 sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2108 for (i=0; i< instance->my_left_memb_entries; i++) {
2110 }
2111 for (i=0; i< instance->my_left_memb_entries; i++) {
2112 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2113 if (sptr2 == 0) {
2114 sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2115 }
2117 }
2118 }
2119 if (sptr2 == 0) {
2120 failed_node_msg[0] = '\0';
2121 }
2122 }
2123 else {
2124 left_node_msg[0] = '\0';
2125 failed_node_msg[0] = '\0';
2126 }
2127
2128 my_leave_memb_clear(instance);
2129
2131 "entering OPERATIONAL state.");
2133 "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2134 instance->my_ring_id.rep,
2135 (uint64_t)instance->my_ring_id.seq,
2138
2139 if (strlen(failed_node_msg)) {
2141 "Failed to receive the leave message.%s",
2143 }
2144
2146
2147 instance->stats.operational_entered++;
2148 instance->stats.continuous_gather = 0;
2149
2150 instance->my_received_flg = 1;
2151
2152 reset_pause_timeout (instance);
2153
2154 /*
2155 * Save ring id information from this configuration to determine
2156 * which processors are transitioning from old regular configuration
2157 * in to new regular configuration on the next configuration change
2158 */
2159 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2160 sizeof (struct memb_ring_id));
2161
2162 return;
2163}
2164
2165static void memb_state_gather_enter (
2166 struct totemsrp_instance *instance,
2168{
2169 int32_t res;
2170
2171 instance->orf_token_discard = 1;
2172
2173 instance->originated_orf_token = 0;
2174
2175 memb_set_merge (
2176 &instance->my_id, 1,
2177 instance->my_proc_list, &instance->my_proc_list_entries);
2178
2179 memb_join_message_send (instance);
2180
2181 /*
2182 * Restart the join timeout
2183 */
2185
2189 (void *)instance,
2190 memb_timer_function_state_gather,
2191 &instance->memb_timer_state_gather_join_timeout);
2192 if (res != 0) {
2193 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2194 }
2195
2196 /*
2197 * Restart the consensus timeout
2198 */
2201
2205 (void *)instance,
2206 memb_timer_function_gather_consensus_timeout,
2207 &instance->memb_timer_state_gather_consensus_timeout);
2208 if (res != 0) {
2209 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2210 }
2211
2212 /*
2213 * Cancel the token loss and token retransmission timeouts
2214 */
2215 cancel_token_retransmit_timeout (instance); // REVIEWED
2216 cancel_token_timeout (instance); // REVIEWED
2217 cancel_merge_detect_timeout (instance);
2218
2219 memb_consensus_reset (instance);
2220
2221 memb_consensus_set (instance, &instance->my_id);
2222
2224 "entering GATHER state from %d(%s).",
2225 gather_from, gsfrom_to_msg(gather_from));
2226
2227 instance->memb_state = MEMB_STATE_GATHER;
2228 instance->stats.gather_entered++;
2229
2231 /*
2232 * State 3 means gather, so we are continuously gathering.
2233 */
2234 instance->stats.continuous_gather++;
2235 }
2236
2237 return;
2238}
2239
2240static void timer_function_token_retransmit_timeout (void *data);
2241
2242static void target_set_completed (
2243 void *context)
2244{
2245 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2246
2247 memb_state_commit_token_send (instance);
2248
2249}
2250
2251static void memb_state_commit_enter (
2252 struct totemsrp_instance *instance)
2253{
2254 old_ring_state_save (instance);
2255
2256 memb_state_commit_token_update (instance);
2257
2258 memb_state_commit_token_target_set (instance);
2259
2261
2263
2265
2267
2268 memb_ring_id_set (instance, &instance->commit_token->ring_id);
2269
2270 instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2271
2272 instance->token_ring_id_seq = instance->my_ring_id.seq;
2273
2275 "entering COMMIT state.");
2276
2277 instance->memb_state = MEMB_STATE_COMMIT;
2278 reset_token_retransmit_timeout (instance); // REVIEWED
2279 reset_token_timeout (instance); // REVIEWED
2280
2281 instance->stats.commit_entered++;
2282 instance->stats.continuous_gather = 0;
2283
2284 /*
2285 * reset all flow control variables since we are starting a new ring
2286 */
2287 instance->my_trc = 0;
2288 instance->my_pbl = 0;
2289 instance->my_cbl = 0;
2290 /*
2291 * commit token sent after callback that token target has been set
2292 */
2293}
2294
2295static void memb_state_recovery_enter (
2296 struct totemsrp_instance *instance,
2298{
2299 int i;
2300 int local_received_flg = 1;
2301 unsigned int low_ring_aru;
2302 unsigned int range = 0;
2303 unsigned int messages_originated = 0;
2304 const struct srp_addr *addr;
2307
2308 addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2309 memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2310
2312 "entering RECOVERY state.");
2313
2314 instance->orf_token_discard = 0;
2315
2316 instance->my_high_ring_delivered = 0;
2317
2318 sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2319 cs_queue_reinit (&instance->retrans_message_queue);
2320
2322
2323 memb_state_commit_token_send_recovery (instance, commit_token);
2324
2325 instance->my_token_seq = SEQNO_START_TOKEN - 1;
2326
2327 /*
2328 * Build regular configuration
2329 */
2331 instance->totemnet_context,
2332 commit_token->addr_entries);
2333
2334 /*
2335 * Build transitional configuration
2336 */
2337 for (i = 0; i < instance->my_new_memb_entries; i++) {
2340 sizeof (struct memb_ring_id));
2341 }
2342 memb_set_and_with_ring_id (
2343 instance->my_new_memb_list,
2345 instance->my_new_memb_entries,
2346 instance->my_memb_list,
2347 instance->my_memb_entries,
2348 &instance->my_old_ring_id,
2349 instance->my_trans_memb_list,
2350 &instance->my_trans_memb_entries);
2351
2352 for (i = 0; i < instance->my_trans_memb_entries; i++) {
2354 "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2355 }
2356 for (i = 0; i < instance->my_new_memb_entries; i++) {
2358 "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2360 "previous ringid (" CS_PRI_RING_ID ")",
2361 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2362
2364 "aru %x high delivered %x received flag %d",
2365 memb_list[i].aru,
2366 memb_list[i].high_delivered,
2367 memb_list[i].received_flg);
2368
2369 // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2370 }
2371 /*
2372 * Determine if any received flag is false
2373 */
2374 for (i = 0; i < commit_token->addr_entries; i++) {
2375 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2376 instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2377
2378 memb_list[i].received_flg == 0) {
2379 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2380 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2381 sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2383 break;
2384 }
2385 }
2386 if (local_received_flg == 1) {
2387 goto no_originate;
2388 } /* Else originate messages if we should */
2389
2390 /*
2391 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2392 */
2393 for (i = 0; i < commit_token->addr_entries; i++) {
2394 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2395 instance->my_deliver_memb_list,
2396 instance->my_deliver_memb_entries) &&
2397
2398 memcmp (&instance->my_old_ring_id,
2399 &memb_list[i].ring_id,
2400 sizeof (struct memb_ring_id)) == 0) {
2401
2402 if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2403
2404 low_ring_aru = memb_list[i].aru;
2405 }
2406 if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2407 instance->my_high_ring_delivered = memb_list[i].high_delivered;
2408 }
2409 }
2410 }
2411
2412 /*
2413 * Copy all old ring messages to instance->retrans_message_queue
2414 */
2416 if (range == 0) {
2417 /*
2418 * No messages to copy
2419 */
2420 goto no_originate;
2421 }
2423
2425 "copying all old ring messages from %x-%x.",
2427
2428 for (i = 1; i <= range; i++) {
2431 void *ptr;
2432 int res;
2433
2434 res = sq_item_get (&instance->regular_sort_queue,
2435 low_ring_aru + i, &ptr);
2436 if (res != 0) {
2437 continue;
2438 }
2441 memset (&message_item, 0, sizeof (struct message_item));
2442 // TODO LEAK
2443 message_item.mcast = totemsrp_buffer_alloc (instance);
2445 memset(message_item.mcast, 0, sizeof (struct mcast));
2449 message_item.mcast->system_from = instance->my_id;
2451
2455 sizeof (struct memb_ring_id));
2456 message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2457 memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2460 cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2461 }
2463 "Originated %d messages in RECOVERY.", messages_originated);
2464 goto originated;
2465
2468 "Did not need to originate any messages in recovery.");
2469
2471 instance->my_aru = SEQNO_START_MSG;
2472 instance->my_aru_count = 0;
2473 instance->my_seq_unchanged = 0;
2475 instance->my_install_seq = SEQNO_START_MSG;
2476 instance->last_released = SEQNO_START_MSG;
2477
2478 reset_token_timeout (instance); // REVIEWED
2479 reset_token_retransmit_timeout (instance); // REVIEWED
2480
2481 instance->memb_state = MEMB_STATE_RECOVERY;
2482 instance->stats.recovery_entered++;
2483 instance->stats.continuous_gather = 0;
2484
2485 return;
2486}
2487
2489{
2490 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2491
2492 token_hold_cancel_send (instance);
2493
2494 return;
2495}
2496
2498 void *srp_context,
2499 struct iovec *iovec,
2500 unsigned int iov_len,
2501 int guarantee)
2502{
2503 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2504 int i;
2506 char *addr;
2507 unsigned int addr_idx;
2508 struct cs_queue *queue_use;
2509
2510 if (instance->waiting_trans_ack) {
2512 } else {
2513 queue_use = &instance->new_message_queue;
2514 }
2515
2516 if (cs_queue_is_full (queue_use)) {
2517 log_printf (instance->totemsrp_log_level_debug, "queue full");
2518 return (-1);
2519 }
2520
2521 memset (&message_item, 0, sizeof (struct message_item));
2522
2523 /*
2524 * Allocate pending item
2525 */
2526 message_item.mcast = totemsrp_buffer_alloc (instance);
2527 if (message_item.mcast == 0) {
2528 goto error_mcast;
2529 }
2530
2531 /*
2532 * Set mcast header
2533 */
2534 memset(message_item.mcast, 0, sizeof (struct mcast));
2539
2542
2544 message_item.mcast->system_from = instance->my_id;
2545
2546 addr = (char *)message_item.mcast;
2547 addr_idx = sizeof (struct mcast);
2548 for (i = 0; i < iov_len; i++) {
2550 addr_idx += iovec[i].iov_len;
2551 }
2552
2554
2555 log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2556 instance->stats.mcast_tx++;
2557 cs_queue_item_add (queue_use, &message_item);
2558
2559 return (0);
2560
2562 return (-1);
2563}
2564
2565/*
2566 * Determine if there is room to queue a new message
2567 */
2569{
2570 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2571 int avail;
2572 struct cs_queue *queue_use;
2573
2574 if (instance->waiting_trans_ack) {
2576 } else {
2577 queue_use = &instance->new_message_queue;
2578 }
2579 cs_queue_avail (queue_use, &avail);
2580
2581 return (avail);
2582}
2583
2584/*
2585 * ORF Token Management
2586 */
2587/*
2588 * Recast message to mcast group if it is available
2589 */
2590static int orf_token_remcast (
2591 struct totemsrp_instance *instance,
2592 int seq)
2593{
2595 int res;
2596 void *ptr;
2597
2598 struct sq *sort_queue;
2599
2600 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2601 sort_queue = &instance->recovery_sort_queue;
2602 } else {
2603 sort_queue = &instance->regular_sort_queue;
2604 }
2605
2606 res = sq_in_range (sort_queue, seq);
2607 if (res == 0) {
2608 log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2609 return (-1);
2610 }
2611
2612 /*
2613 * Get RTR item at seq, if not available, return
2614 */
2615 res = sq_item_get (sort_queue, seq, &ptr);
2616 if (res != 0) {
2617 return -1;
2618 }
2619
2621
2623 instance->totemnet_context,
2626
2627 return (0);
2628}
2629
2630
2631/*
2632 * Free all freeable messages from ring
2633 */
2634static void messages_free (
2635 struct totemsrp_instance *instance,
2636 unsigned int token_aru)
2637{
2639 unsigned int i;
2640 int res;
2641 int log_release = 0;
2642 unsigned int release_to;
2643 unsigned int range = 0;
2644
2646 if (sq_lt_compare (instance->my_last_aru, release_to)) {
2647 release_to = instance->my_last_aru;
2648 }
2649 if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2650 release_to = instance->my_high_delivered;
2651 }
2652
2653 /*
2654 * Ensure we dont try release before an already released point
2655 */
2656 if (sq_lt_compare (release_to, instance->last_released)) {
2657 return;
2658 }
2659
2660 range = release_to - instance->last_released;
2662
2663 /*
2664 * Release retransmit list items if group aru indicates they are transmitted
2665 */
2666 for (i = 1; i <= range; i++) {
2667 void *ptr;
2668
2669 res = sq_item_get (&instance->regular_sort_queue,
2670 instance->last_released + i, &ptr);
2671 if (res == 0) {
2673 totemsrp_buffer_release (instance, regular_message->mcast);
2674 }
2675 sq_items_release (&instance->regular_sort_queue,
2676 instance->last_released + i);
2677
2678 log_release = 1;
2679 }
2680 instance->last_released += range;
2681
2682 if (log_release) {
2684 "releasing messages up to and including %x", release_to);
2685 }
2686}
2687
2688static void update_aru (
2689 struct totemsrp_instance *instance)
2690{
2691 unsigned int i;
2692 int res;
2693 struct sq *sort_queue;
2694 unsigned int range;
2695 unsigned int my_aru_saved = 0;
2696
2697 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2698 sort_queue = &instance->recovery_sort_queue;
2699 } else {
2700 sort_queue = &instance->regular_sort_queue;
2701 }
2702
2703 range = instance->my_high_seq_received - instance->my_aru;
2704
2705 my_aru_saved = instance->my_aru;
2706 for (i = 1; i <= range; i++) {
2707
2708 void *ptr;
2709
2710 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2711 /*
2712 * If hole, stop updating aru
2713 */
2714 if (res != 0) {
2715 break;
2716 }
2717 }
2718 instance->my_aru += i - 1;
2719}
2720
2721/*
2722 * Multicasts pending messages onto the ring (requires orf_token possession)
2723 */
2724static int orf_token_mcast (
2725 struct totemsrp_instance *instance,
2726 struct orf_token *token,
2728{
2729 struct message_item *message_item = 0;
2730 struct cs_queue *mcast_queue;
2731 struct sq *sort_queue;
2733 struct mcast *mcast;
2734 unsigned int fcc_mcast_current;
2735
2736 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2738 sort_queue = &instance->recovery_sort_queue;
2739 reset_token_retransmit_timeout (instance); // REVIEWED
2740 } else {
2741 if (instance->waiting_trans_ack) {
2743 } else {
2744 mcast_queue = &instance->new_message_queue;
2745 }
2746
2747 sort_queue = &instance->regular_sort_queue;
2748 }
2749
2751 if (cs_queue_is_empty (mcast_queue)) {
2752 break;
2753 }
2754 message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2755
2756 message_item->mcast->seq = ++token->seq;
2757 message_item->mcast->this_seqno = instance->global_seqno++;
2758
2759 /*
2760 * Build IO vector
2761 */
2762 memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2765
2767
2768 memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2769
2770 /*
2771 * Add message to retransmit queue
2772 */
2773 sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2774
2776 instance->totemnet_context,
2779
2780 /*
2781 * Delete item from pending queue
2782 */
2783 cs_queue_item_remove (mcast_queue);
2784
2785 /*
2786 * If messages mcasted, deliver any new messages to totempg
2787 */
2788 instance->my_high_seq_received = token->seq;
2789 }
2790
2791 update_aru (instance);
2792
2793 /*
2794 * Return 1 if more messages are available for single node clusters
2795 */
2796 return (fcc_mcast_current);
2797}
2798
2799/*
2800 * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2801 * Modify's orf_token's rtr to include retransmits required by this process
2802 */
2803static int orf_token_rtr (
2804 struct totemsrp_instance *instance,
2805 struct orf_token *orf_token,
2806 unsigned int *fcc_allowed)
2807{
2808 unsigned int res;
2809 unsigned int i, j;
2810 unsigned int found;
2811 struct sq *sort_queue;
2812 struct rtr_item *rtr_list;
2813 unsigned int range = 0;
2814 char retransmit_msg[1024];
2815 char value[64];
2816
2817 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2818 sort_queue = &instance->recovery_sort_queue;
2819 } else {
2820 sort_queue = &instance->regular_sort_queue;
2821 }
2822
2824
2825 strcpy (retransmit_msg, "Retransmit List: ");
2828 "Retransmit List %d", orf_token->rtr_list_entries);
2829 for (i = 0; i < orf_token->rtr_list_entries; i++) {
2830 sprintf (value, "%x ", rtr_list[i].seq);
2832 }
2833 strcat (retransmit_msg, "");
2835 "%s", retransmit_msg);
2836 }
2837
2838 /*
2839 * Retransmit messages on orf_token's RTR list from RTR queue
2840 */
2841 for (instance->fcc_remcast_current = 0, i = 0;
2843
2844 /*
2845 * If this retransmit request isn't from this configuration,
2846 * try next rtr entry
2847 */
2848 if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2849 sizeof (struct memb_ring_id)) != 0) {
2850
2851 i += 1;
2852 continue;
2853 }
2854
2855 res = orf_token_remcast (instance, rtr_list[i].seq);
2856 if (res == 0) {
2857 /*
2858 * Multicasted message, so no need to copy to new retransmit list
2859 */
2862 memmove (&rtr_list[i], &rtr_list[i + 1],
2863 sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2864
2865 instance->stats.mcast_retx++;
2866 instance->fcc_remcast_current++;
2867 } else {
2868 i += 1;
2869 }
2870 }
2872
2873 /*
2874 * Add messages to retransmit to RTR list
2875 * but only retry if there is room in the retransmit list
2876 */
2877
2878 range = orf_token->seq - instance->my_aru;
2880
2882 (i <= range); i++) {
2883
2884 /*
2885 * Ensure message is within the sort queue range
2886 */
2887 res = sq_in_range (sort_queue, instance->my_aru + i);
2888 if (res == 0) {
2889 break;
2890 }
2891
2892 /*
2893 * Find if a message is missing from this processor
2894 */
2895 res = sq_item_inuse (sort_queue, instance->my_aru + i);
2896 if (res == 0) {
2897 /*
2898 * Determine how many times we have missed receiving
2899 * this sequence number. sq_item_miss_count increments
2900 * a counter for the sequence number. The miss count
2901 * will be returned and compared. This allows time for
2902 * delayed multicast messages to be received before
2903 * declaring the message is missing and requesting a
2904 * retransmit.
2905 */
2906 res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2908 continue;
2909 }
2910
2911 /*
2912 * Determine if missing message is already in retransmit list
2913 */
2914 found = 0;
2915 for (j = 0; j < orf_token->rtr_list_entries; j++) {
2916 if (instance->my_aru + i == rtr_list[j].seq) {
2917 found = 1;
2918 }
2919 }
2920 if (found == 0) {
2921 /*
2922 * Missing message not found in current retransmit list so add it
2923 */
2925 &instance->my_ring_id, sizeof (struct memb_ring_id));
2928 }
2929 }
2930 }
2931 return (instance->fcc_remcast_current);
2932}
2933
2934static void token_retransmit (struct totemsrp_instance *instance)
2935{
2936 instance->stats.orf_token_tx++;
2938 instance->orf_token_retransmit,
2939 instance->orf_token_retransmit_size);
2940}
2941
2942/*
2943 * Retransmit the regular token if no mcast or token has
2944 * been received in retransmit token period retransmit
2945 * the token to the next processor
2946 */
2947static void timer_function_token_retransmit_timeout (void *data)
2948{
2949 struct totemsrp_instance *instance = data;
2950
2951 switch (instance->memb_state) {
2952 case MEMB_STATE_GATHER:
2953 break;
2954 case MEMB_STATE_COMMIT:
2957 token_retransmit (instance);
2958 reset_token_retransmit_timeout (instance); // REVIEWED
2959 break;
2960 }
2961}
2962
2963static void timer_function_token_hold_retransmit_timeout (void *data)
2964{
2965 struct totemsrp_instance *instance = data;
2966
2967 switch (instance->memb_state) {
2968 case MEMB_STATE_GATHER:
2969 break;
2970 case MEMB_STATE_COMMIT:
2971 break;
2974 token_retransmit (instance);
2975 break;
2976 }
2977}
2978
2979static void timer_function_merge_detect_timeout(void *data)
2980{
2981 struct totemsrp_instance *instance = data;
2982
2984
2985 switch (instance->memb_state) {
2987 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2988 memb_merge_detect_transmit (instance);
2989 }
2990 break;
2991 case MEMB_STATE_GATHER:
2992 case MEMB_STATE_COMMIT:
2994 break;
2995 }
2996}
2997
2998/*
2999 * Send orf_token to next member (requires orf_token)
3000 */
3001static int token_send (
3002 struct totemsrp_instance *instance,
3003 struct orf_token *orf_token,
3004 int forward_token)
3005{
3006 int res = 0;
3007 unsigned int orf_token_size;
3008
3009 orf_token_size = sizeof (struct orf_token) +
3010 (orf_token->rtr_list_entries * sizeof (struct rtr_item));
3011
3012 orf_token->header.nodeid = instance->my_id.nodeid;
3016
3017 if (forward_token == 0) {
3018 return (0);
3019 }
3020
3021 instance->stats.orf_token_tx++;
3023 orf_token,
3025
3026 return (res);
3027}
3028
3029static int token_hold_cancel_send (struct totemsrp_instance *instance)
3030{
3032
3033 /*
3034 * Only cancel if the token is currently held
3035 */
3036 if (instance->my_token_held == 0) {
3037 return (0);
3038 }
3039 instance->my_token_held = 0;
3040
3041 /*
3042 * Build message
3043 */
3050 sizeof (struct memb_ring_id));
3052
3053 instance->stats.token_hold_cancel_tx++;
3054
3056 sizeof (struct token_hold_cancel));
3057
3058 return (0);
3059}
3060
3061static int orf_token_send_initial (struct totemsrp_instance *instance)
3062{
3063 struct orf_token orf_token;
3064 int res;
3065
3070 orf_token.header.nodeid = instance->my_id.nodeid;
3075 instance->my_set_retrans_flg = 1;
3076
3077 if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3079 instance->my_set_retrans_flg = 0;
3080 } else {
3082 instance->my_set_retrans_flg = 1;
3083 }
3084
3085 orf_token.aru = 0;
3087 orf_token.aru_addr = instance->my_id.nodeid;
3088
3089 memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3090 orf_token.fcc = 0;
3091 orf_token.backlog = 0;
3092
3094
3095 res = token_send (instance, &orf_token, 1);
3096
3097 return (res);
3098}
3099
3100static void memb_state_commit_token_update (
3101 struct totemsrp_instance *instance)
3102{
3103 struct srp_addr *addr;
3105 unsigned int high_aru;
3106 unsigned int i;
3107
3108 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3110
3111 memcpy (instance->my_new_memb_list, addr,
3112 sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3113
3114 instance->my_new_memb_entries = instance->commit_token->addr_entries;
3115
3116 memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3117 &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3118
3119 memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3120 /*
3121 * TODO high delivered is really instance->my_aru, but with safe this
3122 * could change?
3123 */
3124 instance->my_received_flg =
3125 (instance->my_aru == instance->my_high_seq_received);
3126
3127 memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3128
3129 memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3130 /*
3131 * find high aru up to current memb_index for all matching ring ids
3132 * if any ring id matching memb_index has aru less then high aru set
3133 * received flag for that entry to false
3134 */
3135 high_aru = memb_list[instance->commit_token->memb_index].aru;
3136 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3137 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3138 &memb_list[i].ring_id,
3139 sizeof (struct memb_ring_id)) == 0) {
3140
3141 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3142 high_aru = memb_list[i].aru;
3143 }
3144 }
3145 }
3146
3147 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3148 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3149 &memb_list[i].ring_id,
3150 sizeof (struct memb_ring_id)) == 0) {
3151
3152 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3153 memb_list[i].received_flg = 0;
3154 if (i == instance->commit_token->memb_index) {
3155 instance->my_received_flg = 0;
3156 }
3157 }
3158 }
3159 }
3160
3161 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3162 instance->commit_token->memb_index += 1;
3163 assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3164 assert (instance->commit_token->header.nodeid);
3165}
3166
3167static void memb_state_commit_token_target_set (
3168 struct totemsrp_instance *instance)
3169{
3170 struct srp_addr *addr;
3171
3172 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3173
3174 /* Totemnet just looks at the node id */
3176 instance->totemnet_context,
3177 addr[instance->commit_token->memb_index %
3178 instance->commit_token->addr_entries].nodeid);
3179}
3180
3181static int memb_state_commit_token_send_recovery (
3182 struct totemsrp_instance *instance,
3183 struct memb_commit_token *commit_token)
3184{
3185 unsigned int commit_token_size;
3186
3187 commit_token->token_seq++;
3188 commit_token->header.nodeid = instance->my_id.nodeid;
3189 commit_token_size = sizeof (struct memb_commit_token) +
3190 ((sizeof (struct srp_addr) +
3192 /*
3193 * Make a copy for retransmission if necessary
3194 */
3195 memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3197
3198 instance->stats.memb_commit_token_tx++;
3199
3201 commit_token,
3203
3204 /*
3205 * Request retransmission of the commit token in case it is lost
3206 */
3207 reset_token_retransmit_timeout (instance);
3208 return (0);
3209}
3210
3211static int memb_state_commit_token_send (
3212 struct totemsrp_instance *instance)
3213{
3214 unsigned int commit_token_size;
3215
3216 instance->commit_token->token_seq++;
3217 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3218 commit_token_size = sizeof (struct memb_commit_token) +
3219 ((sizeof (struct srp_addr) +
3221 /*
3222 * Make a copy for retransmission if necessary
3223 */
3226
3227 instance->stats.memb_commit_token_tx++;
3228
3230 instance->commit_token,
3232
3233 /*
3234 * Request retransmission of the commit token in case it is lost
3235 */
3236 reset_token_retransmit_timeout (instance);
3237 return (0);
3238}
3239
3240
3241static int memb_lowest_in_config (struct totemsrp_instance *instance)
3242{
3244 int token_memb_entries = 0;
3245 int i;
3246 unsigned int lowest_nodeid;
3247
3248 memb_set_subtract (token_memb, &token_memb_entries,
3249 instance->my_proc_list, instance->my_proc_list_entries,
3250 instance->my_failed_list, instance->my_failed_list_entries);
3251
3252 /*
3253 * find representative by searching for smallest identifier
3254 */
3256
3257 lowest_nodeid = token_memb[0].nodeid;
3258 for (i = 1; i < token_memb_entries; i++) {
3260 lowest_nodeid = token_memb[i].nodeid;
3261 }
3262 }
3263 return (lowest_nodeid == instance->my_id.nodeid);
3264}
3265
3266static int srp_addr_compare (const void *a, const void *b)
3267{
3268 const struct srp_addr *srp_a = (const struct srp_addr *)a;
3269 const struct srp_addr *srp_b = (const struct srp_addr *)b;
3270
3271 if (srp_a->nodeid < srp_b->nodeid) {
3272 return -1;
3273 } else if (srp_a->nodeid > srp_b->nodeid) {
3274 return 1;
3275 } else {
3276 return 0;
3277 }
3278}
3279
3280static void memb_state_commit_token_create (
3281 struct totemsrp_instance *instance)
3282{
3284 struct srp_addr *addr;
3286 int token_memb_entries = 0;
3287
3289 "Creating commit token because I am the rep.");
3290
3291 memb_set_subtract (token_memb, &token_memb_entries,
3292 instance->my_proc_list, instance->my_proc_list_entries,
3293 instance->my_failed_list, instance->my_failed_list_entries);
3294
3295 memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3299 instance->commit_token->header.encapsulated = 0;
3300 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3301 assert (instance->commit_token->header.nodeid);
3302
3303 instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3304 instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3305
3306 /*
3307 * This qsort is necessary to ensure the commit token traverses
3308 * the ring in the proper order
3309 */
3310 qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3311 srp_addr_compare);
3312
3313 instance->commit_token->memb_index = 0;
3315
3316 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3318
3320 token_memb_entries * sizeof (struct srp_addr));
3321 memset (memb_list, 0,
3323}
3324
3325static void memb_join_message_send (struct totemsrp_instance *instance)
3326{
3327 char memb_join_data[40000];
3328 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3329 char *addr;
3330 unsigned int addr_idx;
3331 size_t msg_len;
3332
3337 memb_join->header.nodeid = instance->my_id.nodeid;
3339
3340 msg_len = sizeof(struct memb_join) +
3341 ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3342
3343 if (msg_len > sizeof(memb_join_data)) {
3345 "memb_join_message too long. Ignoring message.");
3346
3347 return ;
3348 }
3349
3350 memb_join->ring_seq = instance->my_ring_id.seq;
3353 memb_join->system_from = instance->my_id;
3354
3355 /*
3356 * This mess adds the joined and failed processor lists into the join
3357 * message
3358 */
3359 addr = (char *)memb_join;
3360 addr_idx = sizeof (struct memb_join);
3361 memcpy (&addr[addr_idx],
3362 instance->my_proc_list,
3363 instance->my_proc_list_entries *
3364 sizeof (struct srp_addr));
3365 addr_idx +=
3366 instance->my_proc_list_entries *
3367 sizeof (struct srp_addr);
3368 memcpy (&addr[addr_idx],
3369 instance->my_failed_list,
3370 instance->my_failed_list_entries *
3371 sizeof (struct srp_addr));
3372 addr_idx +=
3373 instance->my_failed_list_entries *
3374 sizeof (struct srp_addr);
3375
3376 if (instance->totem_config->send_join_timeout) {
3377 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3378 }
3379
3380 instance->stats.memb_join_tx++;
3381
3383 instance->totemnet_context,
3384 memb_join,
3385 addr_idx);
3386}
3387
3388static void memb_leave_message_send (struct totemsrp_instance *instance)
3389{
3390 char memb_join_data[40000];
3391 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3392 char *addr;
3393 unsigned int addr_idx;
3396 size_t msg_len;
3397
3399 "sending join/leave message");
3400
3401 /*
3402 * add us to the failed list, and remove us from
3403 * the members list
3404 */
3405 memb_set_merge(
3406 &instance->my_id, 1,
3407 instance->my_failed_list, &instance->my_failed_list_entries);
3408
3409 memb_set_subtract (active_memb, &active_memb_entries,
3410 instance->my_proc_list, instance->my_proc_list_entries,
3411 &instance->my_id, 1);
3412
3413 msg_len = sizeof(struct memb_join) +
3414 ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3415
3416 if (msg_len > sizeof(memb_join_data)) {
3418 "memb_leave message too long. Ignoring message.");
3419
3420 return ;
3421 }
3422
3428
3429 memb_join->ring_seq = instance->my_ring_id.seq;
3432 memb_join->system_from = instance->my_id;
3433
3434 // TODO: CC Maybe use the actual join send routine.
3435 /*
3436 * This mess adds the joined and failed processor lists into the join
3437 * message
3438 */
3439 addr = (char *)memb_join;
3440 addr_idx = sizeof (struct memb_join);
3441 memcpy (&addr[addr_idx],
3444 sizeof (struct srp_addr));
3445 addr_idx +=
3447 sizeof (struct srp_addr);
3448 memcpy (&addr[addr_idx],
3449 instance->my_failed_list,
3450 instance->my_failed_list_entries *
3451 sizeof (struct srp_addr));
3452 addr_idx +=
3453 instance->my_failed_list_entries *
3454 sizeof (struct srp_addr);
3455
3456
3457 if (instance->totem_config->send_join_timeout) {
3458 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3459 }
3460 instance->stats.memb_join_tx++;
3461
3463 instance->totemnet_context,
3464 memb_join,
3465 addr_idx);
3466}
3467
3468static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3469{
3471
3479 sizeof (struct memb_ring_id));
3481
3482 instance->stats.memb_merge_detect_tx++;
3485 sizeof (struct memb_merge_detect));
3486}
3487
3488static void memb_ring_id_set (
3489 struct totemsrp_instance *instance,
3490 const struct memb_ring_id *ring_id)
3491{
3492
3493 memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3494}
3495
3497 void *srp_context,
3498 void **handle_out,
3500 int delete,
3501 int (*callback_fn) (enum totem_callback_token_type type, const void *),
3502 const void *data)
3503{
3504 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3505 struct token_callback_instance *callback_handle;
3506
3507 token_hold_cancel_send (instance);
3508
3509 callback_handle = malloc (sizeof (struct token_callback_instance));
3510 if (callback_handle == 0) {
3511 return (-1);
3512 }
3513 *handle_out = (void *)callback_handle;
3514 qb_list_init (&callback_handle->list);
3515 callback_handle->callback_fn = callback_fn;
3516 callback_handle->data = (void *) data;
3517 callback_handle->callback_type = type;
3518 callback_handle->delete = delete;
3519 switch (type) {
3521 qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3522 break;
3524 qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3525 break;
3526 }
3527
3528 return (0);
3529}
3530
3532{
3533 struct token_callback_instance *h;
3534
3535 if (*handle_out) {
3537 qb_list_del (&h->list);
3538 free (h);
3539 h = NULL;
3540 *handle_out = 0;
3541 }
3542}
3543
3544static void token_callbacks_execute (
3545 struct totemsrp_instance *instance,
3547{
3548 struct qb_list_head *list, *tmp_iter;
3549 struct qb_list_head *callback_listhead = 0;
3551 int res;
3552 int del;
3553
3554 switch (type) {
3557 break;
3560 break;
3561 default:
3562 assert (0);
3563 }
3564
3568 if (del == 1) {
3569 qb_list_del (list);
3570 }
3571
3575 /*
3576 * This callback failed to execute, try it again on the next token
3577 */
3578 if (res == -1 && del == 1) {
3580 } else if (del) {
3582 }
3583 }
3584}
3585
3586/*
3587 * Flow control functions
3588 */
3589static unsigned int backlog_get (struct totemsrp_instance *instance)
3590{
3591 unsigned int backlog = 0;
3592 struct cs_queue *queue_use = NULL;
3593
3594 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3595 if (instance->waiting_trans_ack) {
3597 } else {
3598 queue_use = &instance->new_message_queue;
3599 }
3600 } else
3601 if (instance->memb_state == MEMB_STATE_RECOVERY) {
3602 queue_use = &instance->retrans_message_queue;
3603 }
3604
3605 if (queue_use != NULL) {
3606 backlog = cs_queue_used (queue_use);
3607 }
3608
3609 instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3610 return (backlog);
3611}
3612
3613static int fcc_calculate (
3614 struct totemsrp_instance *instance,
3615 struct orf_token *token)
3616{
3617 unsigned int transmits_allowed;
3618 unsigned int backlog_calc;
3619
3621
3622 if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3623 transmits_allowed = instance->totem_config->window_size - token->fcc;
3624 }
3625
3626 instance->my_cbl = backlog_get (instance);
3627
3628 /*
3629 * Only do backlog calculation if there is a backlog otherwise
3630 * we would result in div by zero
3631 */
3632 if (token->backlog + instance->my_cbl - instance->my_pbl) {
3633 backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3634 (token->backlog + instance->my_cbl - instance->my_pbl);
3635 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3636 transmits_allowed = backlog_calc;
3637 }
3638 }
3639
3640 return (transmits_allowed);
3641}
3642
3643/*
3644 * don't overflow the RTR sort queue
3645 */
3646static void fcc_rtr_limit (
3647 struct totemsrp_instance *instance,
3648 struct orf_token *token,
3649 unsigned int *transmits_allowed)
3650{
3653 assert (check >= 0);
3654 if (sq_lt_compare (instance->last_released +
3656 instance->totem_config->window_size,
3657
3658 token->seq)) {
3659
3660 *transmits_allowed = 0;
3661 }
3662}
3663
3664static void fcc_token_update (
3665 struct totemsrp_instance *instance,
3666 struct orf_token *token,
3667 unsigned int msgs_transmitted)
3668{
3669 token->fcc += msgs_transmitted - instance->my_trc;
3670 token->backlog += instance->my_cbl - instance->my_pbl;
3671 instance->my_trc = msgs_transmitted;
3672 instance->my_pbl = instance->my_cbl;
3673}
3674
3675/*
3676 * Sanity checkers
3677 */
3678static int check_orf_token_sanity(
3679 const struct totemsrp_instance *instance,
3680 const void *msg,
3681 size_t msg_len,
3682 size_t max_msg_len,
3684{
3685 int rtr_entries;
3686 const struct orf_token *token = (const struct orf_token *)msg;
3687 size_t required_len;
3688
3689 if (msg_len > max_msg_len) {
3691 "Received orf_token message is too long... ignoring.");
3692
3693 return (-1);
3694 }
3695
3696 if (msg_len < sizeof(struct orf_token)) {
3698 "Received orf_token message is too short... ignoring.");
3699
3700 return (-1);
3701 }
3702
3705 } else {
3707 }
3708
3711 "Received orf_token message rtr_entries is corrupted... ignoring.");
3712
3713 return (-1);
3714 }
3715
3716 required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3717 if (msg_len < required_len) {
3719 "Received orf_token message is too short... ignoring.");
3720
3721 return (-1);
3722 }
3723
3724 return (0);
3725}
3726
3727static int check_mcast_sanity(
3728 struct totemsrp_instance *instance,
3729 const void *msg,
3730 size_t msg_len,
3732{
3733
3734 if (msg_len < sizeof(struct mcast)) {
3736 "Received mcast message is too short... ignoring.");
3737
3738 return (-1);
3739 }
3740
3741 return (0);
3742}
3743
3744static int check_memb_merge_detect_sanity(
3745 struct totemsrp_instance *instance,
3746 const void *msg,
3747 size_t msg_len,
3749{
3750
3751 if (msg_len < sizeof(struct memb_merge_detect)) {
3753 "Received memb_merge_detect message is too short... ignoring.");
3754
3755 return (-1);
3756 }
3757
3758 return (0);
3759}
3760
3761static int check_memb_join_sanity(
3762 struct totemsrp_instance *instance,
3763 const void *msg,
3764 size_t msg_len,
3766{
3767 const struct memb_join *mj_msg = (const struct memb_join *)msg;
3768 unsigned int proc_list_entries;
3769 unsigned int failed_list_entries;
3770 size_t required_len;
3771
3772 if (msg_len < sizeof(struct memb_join)) {
3774 "Received memb_join message is too short... ignoring.");
3775
3776 return (-1);
3777 }
3778
3779 proc_list_entries = mj_msg->proc_list_entries;
3780 failed_list_entries = mj_msg->failed_list_entries;
3781
3785 }
3786
3787 required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3788 if (msg_len < required_len) {
3790 "Received memb_join message is too short... ignoring.");
3791
3792 return (-1);
3793 }
3794
3795 return (0);
3796}
3797
3798static int check_memb_commit_token_sanity(
3799 struct totemsrp_instance *instance,
3800 const void *msg,
3801 size_t msg_len,
3803{
3804 const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3805 unsigned int addr_entries;
3806 size_t required_len;
3807
3808 if (msg_len < sizeof(struct memb_commit_token)) {
3810 "Received memb_commit_token message is too short... ignoring.");
3811
3812 return (0);
3813 }
3814
3815 addr_entries= mct_msg->addr_entries;
3818 }
3819
3820 required_len = sizeof(struct memb_commit_token) +
3822 if (msg_len < required_len) {
3824 "Received memb_commit_token message is too short... ignoring.");
3825
3826 return (-1);
3827 }
3828
3829 return (0);
3830}
3831
3832static int check_token_hold_cancel_sanity(
3833 struct totemsrp_instance *instance,
3834 const void *msg,
3835 size_t msg_len,
3837{
3838
3839 if (msg_len < sizeof(struct token_hold_cancel)) {
3841 "Received token_hold_cancel message is too short... ignoring.");
3842
3843 return (-1);
3844 }
3845
3846 return (0);
3847}
3848
3849/*
3850 * Message Handlers
3851 */
3852
3853#ifdef GIVEINFO
3854uint64_t tv_old;
3855#endif
3856/*
3857 * message handler called when TOKEN message type received
3858 */
3859static int message_handler_orf_token (
3860 struct totemsrp_instance *instance,
3861 const void *msg,
3862 size_t msg_len,
3864{
3865 char token_storage[1500];
3866 char token_convert[1500];
3867 struct orf_token *token = NULL;
3868 int forward_token;
3869 unsigned int transmits_allowed;
3870 unsigned int mcasted_retransmit;
3871 unsigned int mcasted_regular;
3872 unsigned int last_aru;
3873
3874#ifdef GIVEINFO
3877
3879 tv_diff = tv_current - tv_old;
3880 tv_old = tv_current;
3881
3883 "Time since last token %0.4f ms", tv_diff / (float)QB_TIME_NS_IN_MSEC);
3884#endif
3885
3886 if (check_orf_token_sanity(instance, msg, msg_len, sizeof(token_storage),
3887 endian_conversion_needed) == -1) {
3888 return (0);
3889 }
3890
3891 if (instance->orf_token_discard) {
3892 return (0);
3893 }
3894#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3896 return (0);
3897 }
3898#endif
3899
3901 orf_token_endian_convert ((struct orf_token *)msg,
3902 (struct orf_token *)token_convert);
3903 msg = (struct orf_token *)token_convert;
3904 }
3905
3906 /*
3907 * Make copy of token and retransmit list in case we have
3908 * to flush incoming messages from the kernel queue
3909 */
3910 token = (struct orf_token *)token_storage;
3911 memcpy (token, msg, sizeof (struct orf_token));
3912 memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3913 sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3914
3915
3916 /*
3917 * Handle merge detection timeout
3918 */
3919 if (token->seq == instance->my_last_seq) {
3920 start_merge_detect_timeout (instance);
3921 instance->my_seq_unchanged += 1;
3922 } else {
3923 cancel_merge_detect_timeout (instance);
3924 cancel_token_hold_retransmit_timeout (instance);
3925 instance->my_seq_unchanged = 0;
3926 }
3927
3928 instance->my_last_seq = token->seq;
3929
3930#ifdef TEST_RECOVERY_MSG_COUNT
3931 if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3932 return (0);
3933 }
3934#endif
3935 instance->flushing = 1;
3937 instance->flushing = 0;
3938
3939 /*
3940 * Determine if we should hold (in reality drop) the token
3941 */
3942 instance->my_token_held = 0;
3943 if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3944 instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3945 instance->my_token_held = 1;
3946 } else {
3947 if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3948 instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3949 instance->my_token_held = 1;
3950 }
3951 }
3952
3953 /*
3954 * Hold onto token when there is no activity on ring and
3955 * this processor is the ring rep
3956 */
3957 forward_token = 1;
3958 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3959 if (instance->my_token_held) {
3960 forward_token = 0;
3961 }
3962 }
3963
3964 switch (instance->memb_state) {
3965 case MEMB_STATE_COMMIT:
3966 /* Discard token */
3967 break;
3968
3970 messages_free (instance, token->aru);
3971 /*
3972 * Do NOT add break, this case should also execute code in gather case.
3973 */
3974
3975 case MEMB_STATE_GATHER:
3976 /*
3977 * DO NOT add break, we use different free mechanism in recovery state
3978 */
3979
3981 /*
3982 * Discard tokens from another configuration
3983 */
3984 if (memcmp (&token->ring_id, &instance->my_ring_id,
3985 sizeof (struct memb_ring_id)) != 0) {
3986
3987 if ((forward_token)
3988 && instance->use_heartbeat) {
3989 reset_heartbeat_timeout(instance);
3990 }
3991 else {
3992 cancel_heartbeat_timeout(instance);
3993 }
3994
3995 return (0); /* discard token */
3996 }
3997
3998 /*
3999 * Discard retransmitted tokens
4000 */
4001 if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
4002 return (0); /* discard token */
4003 }
4004
4005 /*
4006 * Token is valid so trigger callbacks
4007 */
4008 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
4009
4010 last_aru = instance->my_last_aru;
4011 instance->my_last_aru = token->aru;
4012
4013 transmits_allowed = fcc_calculate (instance, token);
4014 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4015
4017 instance->my_token_held == 1 &&
4018 (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4019 instance->my_token_held = 0;
4020 forward_token = 1;
4021 }
4022
4023 fcc_rtr_limit (instance, token, &transmits_allowed);
4024 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4025/*
4026if (mcasted_regular) {
4027printf ("mcasted regular %d\n", mcasted_regular);
4028printf ("token seq %d\n", token->seq);
4029}
4030*/
4031 fcc_token_update (instance, token, mcasted_retransmit +
4033
4034 if (sq_lt_compare (instance->my_aru, token->aru) ||
4035 instance->my_id.nodeid == token->aru_addr ||
4036 token->aru_addr == 0) {
4037
4038 token->aru = instance->my_aru;
4039 if (token->aru == token->seq) {
4040 token->aru_addr = 0;
4041 } else {
4042 token->aru_addr = instance->my_id.nodeid;
4043 }
4044 }
4045 if (token->aru == last_aru && token->aru_addr != 0) {
4046 instance->my_aru_count += 1;
4047 } else {
4048 instance->my_aru_count = 0;
4049 }
4050
4051 /*
4052 * We really don't follow specification there. In specification, OTHER nodes
4053 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4054 * to failed list (so node never mark itself as failed)
4055 */
4056 if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4057 token->aru_addr == instance->my_id.nodeid) {
4058
4060 "FAILED TO RECEIVE");
4061
4062 instance->failed_to_recv = 1;
4063
4064 memb_set_merge (&instance->my_id, 1,
4065 instance->my_failed_list,
4066 &instance->my_failed_list_entries);
4067
4068 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4069 } else {
4070 instance->my_token_seq = token->token_seq;
4071 token->token_seq += 1;
4072
4073 if (instance->memb_state == MEMB_STATE_RECOVERY) {
4074 /*
4075 * instance->my_aru == instance->my_high_seq_received means this processor
4076 * has recovered all messages it can recover
4077 * (ie: its retrans queue is empty)
4078 */
4079 if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4080
4081 if (token->retrans_flg == 0) {
4082 token->retrans_flg = 1;
4083 instance->my_set_retrans_flg = 1;
4084 }
4085 } else
4086 if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4087 token->retrans_flg = 0;
4088 instance->my_set_retrans_flg = 0;
4089 }
4091 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4092 token->retrans_flg, instance->my_set_retrans_flg,
4093 cs_queue_is_empty (&instance->retrans_message_queue),
4094 instance->my_retrans_flg_count, token->aru);
4095 if (token->retrans_flg == 0) {
4096 instance->my_retrans_flg_count += 1;
4097 } else {
4098 instance->my_retrans_flg_count = 0;
4099 }
4100 if (instance->my_retrans_flg_count == 2) {
4101 instance->my_install_seq = token->seq;
4102 }
4104 "install seq %x aru %x high seq received %x",
4105 instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4106 if (instance->my_retrans_flg_count >= 2 &&
4107 instance->my_received_flg == 0 &&
4108 sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4109 instance->my_received_flg = 1;
4110 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4111 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4112 sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4113 }
4114 if (instance->my_retrans_flg_count >= 3 &&
4115 sq_lte_compare (instance->my_install_seq, token->aru)) {
4116 instance->my_rotation_counter += 1;
4117 } else {
4118 instance->my_rotation_counter = 0;
4119 }
4120 if (instance->my_rotation_counter == 2) {
4122 "retrans flag count %x token aru %x install seq %x aru %x %x",
4123 instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4124 instance->my_aru, token->seq);
4125
4126 memb_state_operational_enter (instance);
4127 instance->my_rotation_counter = 0;
4128 instance->my_retrans_flg_count = 0;
4129 }
4130 }
4131
4133 token_send (instance, token, forward_token);
4134
4135#ifdef GIVEINFO
4137 tv_diff = tv_current - tv_old;
4138 tv_old = tv_current;
4140 "I held %0.4f ms",
4141 tv_diff / (float)QB_TIME_NS_IN_MSEC);
4142#endif
4143 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4144 messages_deliver_to_app (instance, 0,
4145 instance->my_high_seq_received);
4146 }
4147
4148 /*
4149 * Deliver messages after token has been transmitted
4150 * to improve performance
4151 */
4152 reset_token_timeout (instance); // REVIEWED
4153 reset_token_retransmit_timeout (instance); // REVIEWED
4154 if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4155 instance->my_token_held == 1) {
4156
4157 start_token_hold_retransmit_timeout (instance);
4158 }
4159
4160 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4161 }
4162 break;
4163 }
4164
4165 if ((forward_token)
4166 && instance->use_heartbeat) {
4167 reset_heartbeat_timeout(instance);
4168 }
4169 else {
4170 cancel_heartbeat_timeout(instance);
4171 }
4172
4173 return (0);
4174}
4175
4176static void messages_deliver_to_app (
4177 struct totemsrp_instance *instance,
4178 int skip,
4179 unsigned int end_point)
4180{
4182 unsigned int i;
4183 int res;
4184 struct mcast *mcast_in;
4185 struct mcast mcast_header;
4186 unsigned int range = 0;
4188 unsigned int my_high_delivered_stored = 0;
4190
4191 range = end_point - instance->my_high_delivered;
4192
4193 if (range) {
4195 "Delivering %x to %x", instance->my_high_delivered,
4196 end_point);
4197 }
4200
4201 /*
4202 * Deliver messages in order from rtr queue to pending delivery queue
4203 */
4204 for (i = 1; i <= range; i++) {
4205
4206 void *ptr = 0;
4207
4208 /*
4209 * If out of range of sort queue, stop assembly
4210 */
4211 res = sq_in_range (&instance->regular_sort_queue,
4213 if (res == 0) {
4214 break;
4215 }
4216
4217 res = sq_item_get (&instance->regular_sort_queue,
4219 /*
4220 * If hole, stop assembly
4221 */
4222 if (res != 0 && skip == 0) {
4223 break;
4224 }
4225
4227
4228 if (res != 0) {
4229 continue;
4230
4231 }
4232
4234
4235 mcast_in = sort_queue_item_p->mcast;
4236 assert (mcast_in != (struct mcast *)0xdeadbeef);
4237
4239 if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4241 mcast_endian_convert (mcast_in, &mcast_header);
4242 } else {
4243 memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4244 }
4245
4246 aligned_system_from = mcast_header.system_from;
4247
4248 /*
4249 * Skip messages not originated in instance->my_deliver_memb
4250 */
4251 if (skip &&
4252 memb_set_subset (&aligned_system_from,
4253 1,
4254 instance->my_deliver_memb_list,
4255 instance->my_deliver_memb_entries) == 0) {
4256
4258
4259 continue;
4260 }
4261
4262 /*
4263 * Message found
4264 */
4266 "Delivering MCAST message with seq %x to pending delivery queue",
4267 mcast_header.seq);
4268
4269 /*
4270 * Message is locally originated multicast
4271 */
4272 instance->totemsrp_deliver_fn (
4273 mcast_header.header.nodeid,
4274 ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4275 sort_queue_item_p->msg_len - sizeof (struct mcast),
4277 }
4278}
4279
4280/*
4281 * recv message handler called when MCAST message type received
4282 */
4283static int message_handler_mcast (
4284 struct totemsrp_instance *instance,
4285 const void *msg,
4286 size_t msg_len,
4288{
4290 struct sq *sort_queue;
4291 struct mcast mcast_header;
4293
4294 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4295 return (0);
4296 }
4297
4299 mcast_endian_convert (msg, &mcast_header);
4300 } else {
4301 memcpy (&mcast_header, msg, sizeof (struct mcast));
4302 }
4303
4304 if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4305 sort_queue = &instance->recovery_sort_queue;
4306 } else {
4307 sort_queue = &instance->regular_sort_queue;
4308 }
4309
4310 assert (msg_len <= FRAME_SIZE_MAX);
4311
4312#ifdef TEST_DROP_MCAST_PERCENTAGE
4313 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4314 return (0);
4315 }
4316#endif
4317
4318 /*
4319 * If the message is foreign execute the switch below
4320 */
4321 if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4322 sizeof (struct memb_ring_id)) != 0) {
4323
4324 aligned_system_from = mcast_header.system_from;
4325
4326 switch (instance->memb_state) {
4328 memb_set_merge (
4330 instance->my_proc_list, &instance->my_proc_list_entries);
4331 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4332 break;
4333
4334 case MEMB_STATE_GATHER:
4335 if (!memb_set_subset (
4337 1,
4338 instance->my_proc_list,
4339 instance->my_proc_list_entries)) {
4340
4341 memb_set_merge (&aligned_system_from, 1,
4342 instance->my_proc_list, &instance->my_proc_list_entries);
4343 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4344 return (0);
4345 }
4346 break;
4347
4348 case MEMB_STATE_COMMIT:
4349 /* discard message */
4350 instance->stats.rx_msg_dropped++;
4351 break;
4352
4354 /* discard message */
4355 instance->stats.rx_msg_dropped++;
4356 break;
4357 }
4358 return (0);
4359 }
4360
4362 "Received ringid (" CS_PRI_RING_ID ") seq %x",
4363 mcast_header.ring_id.rep,
4364 (uint64_t)mcast_header.ring_id.seq,
4365 mcast_header.seq);
4366
4367 /*
4368 * Add mcast message to rtr queue if not already in rtr queue
4369 * otherwise free io vectors
4370 */
4371 if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4372 sq_in_range (sort_queue, mcast_header.seq) &&
4373 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4374
4375 /*
4376 * Allocate new multicast memory block
4377 */
4378// TODO LEAK
4379 sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4380 if (sort_queue_item.mcast == NULL) {
4381 return (-1); /* error here is corrected by the algorithm */
4382 }
4383 memcpy (sort_queue_item.mcast, msg, msg_len);
4384 sort_queue_item.msg_len = msg_len;
4385
4386 if (sq_lt_compare (instance->my_high_seq_received,
4387 mcast_header.seq)) {
4388 instance->my_high_seq_received = mcast_header.seq;
4389 }
4390
4391 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4392 }
4393
4394 update_aru (instance);
4395 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4396 messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4397 }
4398
4399/* TODO remove from retrans message queue for old ring in recovery state */
4400 return (0);
4401}
4402
4403static int message_handler_memb_merge_detect (
4404 struct totemsrp_instance *instance,
4405 const void *msg,
4406 size_t msg_len,
4408{
4411
4412 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4413 return (0);
4414 }
4415
4417 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4418 } else {
4420 sizeof (struct memb_merge_detect));
4421 }
4422
4423 /*
4424 * do nothing if this is a merge detect from this configuration
4425 */
4426 if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4427 sizeof (struct memb_ring_id)) == 0) {
4428
4429 return (0);
4430 }
4431
4433
4434 /*
4435 * Execute merge operation
4436 */
4437 switch (instance->memb_state) {
4439 memb_set_merge (&aligned_system_from, 1,
4440 instance->my_proc_list, &instance->my_proc_list_entries);
4441 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4442 break;
4443
4444 case MEMB_STATE_GATHER:
4445 if (!memb_set_subset (
4447 1,
4448 instance->my_proc_list,
4449 instance->my_proc_list_entries)) {
4450
4451 memb_set_merge (&aligned_system_from, 1,
4452 instance->my_proc_list, &instance->my_proc_list_entries);
4453 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4454 return (0);
4455 }
4456 break;
4457
4458 case MEMB_STATE_COMMIT:
4459 /* do nothing in commit */
4460 break;
4461
4463 /* do nothing in recovery */
4464 break;
4465 }
4466 return (0);
4467}
4468
4469static void memb_join_process (
4470 struct totemsrp_instance *instance,
4471 const struct memb_join *memb_join)
4472{
4473 struct srp_addr *proc_list;
4474 struct srp_addr *failed_list;
4475 int gather_entered = 0;
4479
4483
4484 log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4485 memb_set_log(instance, instance->totemsrp_log_level_trace,
4486 "proclist", proc_list, memb_join->proc_list_entries);
4487 memb_set_log(instance, instance->totemsrp_log_level_trace,
4489 memb_set_log(instance, instance->totemsrp_log_level_trace,
4490 "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4491 memb_set_log(instance, instance->totemsrp_log_level_trace,
4492 "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4493
4495 if (instance->flushing) {
4498 "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4500 if (memb_join->failed_list_entries > 0) {
4501 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4502 }
4503 } else {
4505 "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4506 }
4507 return;
4508 } else {
4511 "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4512 if (memb_join->failed_list_entries > 0) {
4513 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4514 }
4515 }
4516 }
4517
4518 }
4519
4520 if (memb_set_equal (proc_list,
4522 instance->my_proc_list,
4523 instance->my_proc_list_entries) &&
4524
4525 memb_set_equal (failed_list,
4527 instance->my_failed_list,
4528 instance->my_failed_list_entries)) {
4529
4531 memb_consensus_set (instance, &aligned_system_from);
4532 }
4533
4534 if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4535 instance->failed_to_recv = 0;
4536 instance->my_proc_list[0] = instance->my_id;
4537 instance->my_proc_list_entries = 1;
4538 instance->my_failed_list_entries = 0;
4539
4540 memb_state_commit_token_create (instance);
4541
4542 memb_state_commit_enter (instance);
4543 return;
4544 }
4545 if (memb_consensus_agreed (instance) &&
4546 memb_lowest_in_config (instance)) {
4547
4548 memb_state_commit_token_create (instance);
4549
4550 memb_state_commit_enter (instance);
4551 } else {
4552 goto out;
4553 }
4554 } else
4555 if (memb_set_subset (proc_list,
4557 instance->my_proc_list,
4558 instance->my_proc_list_entries) &&
4559
4560 memb_set_subset (failed_list,
4562 instance->my_failed_list,
4563 instance->my_failed_list_entries)) {
4564
4565 goto out;
4566 } else
4567 if (memb_set_subset (&aligned_system_from, 1,
4568 instance->my_failed_list, instance->my_failed_list_entries)) {
4569
4570 goto out;
4571 } else {
4572 memb_set_merge (proc_list,
4574 instance->my_proc_list, &instance->my_proc_list_entries);
4575
4576 if (memb_set_subset (
4577 &instance->my_id, 1,
4579
4580 memb_set_merge (
4582 instance->my_failed_list, &instance->my_failed_list_entries);
4583 } else {
4584 if (memb_set_subset (
4586 instance->my_memb_list,
4587 instance->my_memb_entries)) {
4588
4589 if (memb_set_subset (
4591 instance->my_failed_list,
4592 instance->my_failed_list_entries) == 0) {
4593
4594 memb_set_merge (failed_list,
4596 instance->my_failed_list, &instance->my_failed_list_entries);
4597 } else {
4598 memb_set_subtract (fail_minus_memb,
4602 instance->my_memb_list,
4603 instance->my_memb_entries);
4604
4605 memb_set_merge (fail_minus_memb,
4607 instance->my_failed_list,
4608 &instance->my_failed_list_entries);
4609 }
4610 }
4611 }
4612 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4613 gather_entered = 1;
4614 }
4615
4616out:
4617 if (gather_entered == 0 &&
4618 instance->memb_state == MEMB_STATE_OPERATIONAL) {
4619
4620 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4621 }
4622}
4623
4624static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4625{
4626 int i;
4627 struct srp_addr *in_proc_list;
4628 struct srp_addr *in_failed_list;
4629 struct srp_addr *out_proc_list;
4630 struct srp_addr *out_failed_list;
4631
4632 out->header.magic = TOTEM_MH_MAGIC;
4633 out->header.version = TOTEM_MH_VERSION;
4634 out->header.type = in->header.type;
4635 out->header.nodeid = swab32 (in->header.nodeid);
4636 out->system_from = srp_addr_endian_convert(in->system_from);
4637 out->proc_list_entries = swab32 (in->proc_list_entries);
4638 out->failed_list_entries = swab32 (in->failed_list_entries);
4639 out->ring_seq = swab64 (in->ring_seq);
4640
4641 in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4642 in_failed_list = in_proc_list + out->proc_list_entries;
4643 out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4644 out_failed_list = out_proc_list + out->proc_list_entries;
4645
4646 for (i = 0; i < out->proc_list_entries; i++) {
4647 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4648 }
4649 for (i = 0; i < out->failed_list_entries; i++) {
4650 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4651 }
4652}
4653
4654static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4655{
4656 int i;
4657 struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4658 struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4661
4662 out->header.magic = TOTEM_MH_MAGIC;
4663 out->header.version = TOTEM_MH_VERSION;
4664 out->header.type = in->header.type;
4665 out->header.nodeid = swab32 (in->header.nodeid);
4666 out->token_seq = swab32 (in->token_seq);
4667 out->ring_id.rep = swab32(in->ring_id.rep);
4668 out->ring_id.seq = swab64 (in->ring_id.seq);
4669 out->retrans_flg = swab32 (in->retrans_flg);
4670 out->memb_index = swab32 (in->memb_index);
4671 out->addr_entries = swab32 (in->addr_entries);
4672
4673 in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4674 out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4675 for (i = 0; i < out->addr_entries; i++) {
4676 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4677
4678 /*
4679 * Only convert the memb entry if it has been set
4680 */
4681 if (in_memb_list[i].ring_id.rep != 0) {
4682 out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4683
4684 out_memb_list[i].ring_id.seq =
4687 out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4688 out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4689 }
4690 }
4691}
4692
4693static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4694{
4695 int i;
4696
4697 out->header.magic = TOTEM_MH_MAGIC;
4698 out->header.version = TOTEM_MH_VERSION;
4699 out->header.type = in->header.type;
4700 out->header.nodeid = swab32 (in->header.nodeid);
4701 out->seq = swab32 (in->seq);
4702 out->token_seq = swab32 (in->token_seq);
4703 out->aru = swab32 (in->aru);
4704 out->ring_id.rep = swab32(in->ring_id.rep);
4705 out->aru_addr = swab32(in->aru_addr);
4706 out->ring_id.seq = swab64 (in->ring_id.seq);
4707 out->fcc = swab32 (in->fcc);
4708 out->backlog = swab32 (in->backlog);
4709 out->retrans_flg = swab32 (in->retrans_flg);
4710 out->rtr_list_entries = swab32 (in->rtr_list_entries);
4711 for (i = 0; i < out->rtr_list_entries; i++) {
4712 out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4713 out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4714 out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4715 }
4716}
4717
4718static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4719{
4720 out->header.magic = TOTEM_MH_MAGIC;
4721 out->header.version = TOTEM_MH_VERSION;
4722 out->header.type = in->header.type;
4723 out->header.nodeid = swab32 (in->header.nodeid);
4724 out->header.encapsulated = in->header.encapsulated;
4725
4726 out->seq = swab32 (in->seq);
4727 out->this_seqno = swab32 (in->this_seqno);
4728 out->ring_id.rep = swab32(in->ring_id.rep);
4729 out->ring_id.seq = swab64 (in->ring_id.seq);
4730 out->node_id = swab32 (in->node_id);
4731 out->guarantee = swab32 (in->guarantee);
4732 out->system_from = srp_addr_endian_convert(in->system_from);
4733}
4734
4735static void memb_merge_detect_endian_convert (
4736 const struct memb_merge_detect *in,
4737 struct memb_merge_detect *out)
4738{
4739 out->header.magic = TOTEM_MH_MAGIC;
4740 out->header.version = TOTEM_MH_VERSION;
4741 out->header.type = in->header.type;
4742 out->header.nodeid = swab32 (in->header.nodeid);
4743 out->ring_id.rep = swab32(in->ring_id.rep);
4744 out->ring_id.seq = swab64 (in->ring_id.seq);
4745 out->system_from = srp_addr_endian_convert (in->system_from);
4746}
4747
4748static int ignore_join_under_operational (
4749 struct totemsrp_instance *instance,
4750 const struct memb_join *memb_join)
4751{
4752 struct srp_addr *proc_list;
4753 struct srp_addr *failed_list;
4754 unsigned long long ring_seq;
4756
4761
4762 if (memb_set_subset (&instance->my_id, 1,
4764 return (1);
4765 }
4766
4767 /*
4768 * In operational state, my_proc_list is exactly the same as
4769 * my_memb_list.
4770 */
4771 if ((memb_set_subset (&aligned_system_from, 1,
4772 instance->my_memb_list, instance->my_memb_entries)) &&
4773 (ring_seq < instance->my_ring_id.seq)) {
4774 return (1);
4775 }
4776
4777 return (0);
4778}
4779
4780static int message_handler_memb_join (
4781 struct totemsrp_instance *instance,
4782 const void *msg,
4783 size_t msg_len,
4785{
4786 const struct memb_join *memb_join;
4787 struct memb_join *memb_join_convert = alloca (msg_len);
4789
4790 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4791 return (0);
4792 }
4793
4796 memb_join_endian_convert (msg, memb_join_convert);
4797
4798 } else {
4799 memb_join = msg;
4800 }
4801
4803
4804 /*
4805 * If the process paused because it wasn't scheduled in a timely
4806 * fashion, flush the join messages because they may be queued
4807 * entries
4808 */
4809 if (pause_flush (instance)) {
4810 return (0);
4811 }
4812
4813 if (instance->token_ring_id_seq < memb_join->ring_seq) {
4815 }
4816 switch (instance->memb_state) {
4818 if (!ignore_join_under_operational (instance, memb_join)) {
4819 memb_join_process (instance, memb_join);
4820 }
4821 break;
4822
4823 case MEMB_STATE_GATHER:
4824 memb_join_process (instance, memb_join);
4825 break;
4826
4827 case MEMB_STATE_COMMIT:
4828 if (memb_set_subset (&aligned_system_from,
4829 1,
4830 instance->my_new_memb_list,
4831 instance->my_new_memb_entries) &&
4832
4833 memb_join->ring_seq >= instance->my_ring_id.seq) {
4834
4835 memb_join_process (instance, memb_join);
4836 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4837 }
4838 break;
4839
4841 if (memb_set_subset (&aligned_system_from,
4842 1,
4843 instance->my_new_memb_list,
4844 instance->my_new_memb_entries) &&
4845
4846 memb_join->ring_seq >= instance->my_ring_id.seq) {
4847
4848 memb_join_process (instance, memb_join);
4849 memb_recovery_state_token_loss (instance);
4850 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4851 }
4852 break;
4853 }
4854 return (0);
4855}
4856
4857static int message_handler_memb_commit_token (
4858 struct totemsrp_instance *instance,
4859 const void *msg,
4860 size_t msg_len,
4862{
4866 int sub_entries;
4867
4868 struct srp_addr *addr;
4869
4871 "got commit token");
4872
4873 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4874 return (0);
4875 }
4876
4878 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4879 } else {
4880 memcpy (memb_commit_token_convert, msg, msg_len);
4881 }
4884
4885#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4887 return (0);
4888 }
4889#endif
4890 switch (instance->memb_state) {
4892 /* discard token */
4893 break;
4894
4895 case MEMB_STATE_GATHER:
4896 memb_set_subtract (sub, &sub_entries,
4897 instance->my_proc_list, instance->my_proc_list_entries,
4898 instance->my_failed_list, instance->my_failed_list_entries);
4899
4900 if (memb_set_equal (addr,
4902 sub,
4903 sub_entries) &&
4904
4906 memcpy (instance->commit_token, memb_commit_token, msg_len);
4907 memb_state_commit_enter (instance);
4908 }
4909 break;
4910
4911 case MEMB_STATE_COMMIT:
4912 /*
4913 * If retransmitted commit tokens are sent on this ring
4914 * filter them out and only enter recovery once the
4915 * commit token has traversed the array. This is
4916 * determined by :
4917 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4918 */
4919 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4921 memb_state_recovery_enter (instance, memb_commit_token);
4922 }
4923 break;
4924
4926 if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4927
4928 /* Filter out duplicated tokens */
4929 if (instance->originated_orf_token) {
4930 break;
4931 }
4932
4933 instance->originated_orf_token = 1;
4934
4936 "Sending initial ORF token");
4937
4938 // TODO convert instead of initiate
4939 orf_token_send_initial (instance);
4940 reset_token_timeout (instance); // REVIEWED
4941 reset_token_retransmit_timeout (instance); // REVIEWED
4942 }
4943 break;
4944 }
4945 return (0);
4946}
4947
4948static int message_handler_token_hold_cancel (
4949 struct totemsrp_instance *instance,
4950 const void *msg,
4951 size_t msg_len,
4953{
4954 const struct token_hold_cancel *token_hold_cancel = msg;
4955
4956 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4957 return (0);
4958 }
4959
4960 if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4961 sizeof (struct memb_ring_id)) == 0) {
4962
4963 instance->my_seq_unchanged = 0;
4964 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4965 timer_function_token_retransmit_timeout (instance);
4966 }
4967 }
4968 return (0);
4969}
4970
4971static int check_message_header_validity(
4972 void *context,
4973 const void *msg,
4974 unsigned int msg_len,
4975 const struct sockaddr_storage *system_from)
4976{
4977 struct totemsrp_instance *instance = context;
4978 const struct totem_message_header *message_header = msg;
4979 const char *guessed_str;
4980 const char *msg_byte = msg;
4981
4982 if (msg_len < sizeof (struct totem_message_header)) {
4984 "Message received from %s is too short... Ignoring %u.",
4985 totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4986 return (-1);
4987 }
4988
4989 if (message_header->magic != TOTEM_MH_MAGIC &&
4991 /*
4992 * We've received ether Knet, old version of Corosync,
4993 * or something else. Do some guessing to display (hopefully)
4994 * helpful message
4995 */
4996 guessed_str = NULL;
4997
4998 if (message_header->magic == 0xFFFF) {
4999 /*
5000 * Corosync 2.2 used header with two UINT8_MAX
5001 */
5002 guessed_str = "Corosync 2.2";
5003 } else if (message_header->magic == 0xFEFE) {
5004 /*
5005 * Corosync 2.3+ used header with two UINT8_MAX - 1
5006 */
5007 guessed_str = "Corosync 2.3+";
5008 } else if (msg_byte[0] == 0x01) {
5009 /*
5010 * Knet has stable1 with first byte of message == 1
5011 */
5012 guessed_str = "unencrypted Kronosnet";
5013 } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
5014 /*
5015 * Unencrypted Corosync 1.x/OpenAIS has first byte
5016 * 0-5. Collision with Knet (but still worth the try)
5017 */
5018 guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
5019 } else {
5020 /*
5021 * Encrypted Kronosned packet has a hash at the end of
5022 * the packet and nothing specific at the beginning of the
5023 * packet (just encrypted data).
5024 * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
5025 * is in the beginning of the packet.
5026 *
5027 * So it's not possible to reliably detect ether of them.
5028 */
5029 guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
5030 }
5031
5033 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5035 guessed_str);
5036
5037 return (-1);
5038 }
5039
5040 if (message_header->version != TOTEM_MH_VERSION) {
5042 "Message received from %s has unsupported version %u... Ignoring",
5044 message_header->version);
5045
5046 return (-1);
5047 }
5048
5049 return (0);
5050}
5051
5052
5054 void *context,
5055 const void *msg,
5056 unsigned int msg_len,
5057 const struct sockaddr_storage *system_from)
5058{
5059 struct totemsrp_instance *instance = context;
5060 const struct totem_message_header *message_header = msg;
5061
5062 if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5063 return -1;
5064 }
5065
5066 switch (message_header->type) {
5068 instance->stats.orf_token_rx++;
5069 break;
5070 case MESSAGE_TYPE_MCAST:
5071 instance->stats.mcast_rx++;
5072 break;
5074 instance->stats.memb_merge_detect_rx++;
5075 break;
5077 instance->stats.memb_join_rx++;
5078 break;
5080 instance->stats.memb_commit_token_rx++;
5081 break;
5083 instance->stats.token_hold_cancel_rx++;
5084 break;
5085 default:
5087 "Message received from %s has wrong type... ignoring %d.\n",
5089 (int)message_header->type);
5090
5091 instance->stats.rx_msg_dropped++;
5092 return 0;
5093 }
5094 /*
5095 * Handle incoming message
5096 */
5098 instance,
5099 msg,
5100 msg_len,
5101 message_header->magic != TOTEM_MH_MAGIC);
5102}
5103
5105 void *context,
5106 const struct totem_ip_address *interface_addr,
5107 unsigned short ip_port,
5108 unsigned int iface_no)
5109{
5110 struct totemsrp_instance *instance = context;
5111 int res;
5112
5114
5116 instance->totemnet_context,
5118 ip_port,
5119 iface_no);
5120
5121 return (res);
5122}
5123
5124/* Contrary to its name, this only gets called when the interface is enabled */
5126 void *context,
5127 const struct totem_ip_address *iface_addr,
5128 unsigned int iface_no)
5129{
5130 struct totemsrp_instance *instance = context;
5131 int num_interfaces;
5132 int i;
5133 int res = 0;
5134
5135 if (!instance->my_id.nodeid) {
5136 instance->my_id.nodeid = iface_addr->nodeid;
5137 }
5139
5140 if (instance->iface_changes++ == 0) {
5141 instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5142 /*
5143 * Increase the ring_id sequence number. This doesn't follow specification.
5144 * Solves problem with restarted leader node (node with lowest nodeid) before
5145 * rest of the cluster forms new membership and guarantees unique ring_id for
5146 * new singleton configuration.
5147 */
5148 instance->my_ring_id.seq++;
5149
5150 instance->token_ring_id_seq = instance->my_ring_id.seq;
5151 log_printf (
5152 instance->totemsrp_log_level_debug,
5153 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5154 instance->my_ring_id.rep,
5155 (uint64_t)instance->my_ring_id.seq);
5156
5157 if (instance->totemsrp_service_ready_fn) {
5158 instance->totemsrp_service_ready_fn ();
5159 }
5160
5161 }
5162
5163 num_interfaces = 0;
5164 for (i = 0; i < INTERFACE_MAX; i++) {
5165 if (instance->totem_config->interfaces[i].configured) {
5167 }
5168 }
5169
5170 if (instance->iface_changes >= num_interfaces) {
5171 /* We need to clear orig_interfaces so that 'commit' diffs against nothing */
5172 instance->totem_config->orig_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
5173 assert(instance->totem_config->orig_interfaces != NULL);
5175
5177
5178 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5179 free(instance->totem_config->orig_interfaces);
5180 }
5181 return res;
5182}
5183
5185 totem_config->net_mtu -= 2 * sizeof (struct mcast);
5186}
5187
5189 void *context,
5190 void (*totem_service_ready) (void))
5191{
5192 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5193
5195}
5196
5198 void *context,
5199 const struct totem_ip_address *member,
5200 int iface_no)
5201{
5202 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5203 int res;
5204
5205 res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5206
5207 return (res);
5208}
5209
5211 void *context,
5212 const struct totem_ip_address *member,
5213 int iface_no)
5214{
5215 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5216 int res;
5217
5218 res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5219
5220 return (res);
5221}
5222
5224{
5225 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5226
5227 instance->threaded_mode_enabled = 1;
5228}
5229
5230void totemsrp_trans_ack (void *context)
5231{
5232 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5233
5234 instance->waiting_trans_ack = 0;
5236}
5237
5238
5240{
5241 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5242 int res;
5243
5245 return (res);
5246}
5247
5249{
5250 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5251 int res;
5252
5254 return (res);
5255}
5256
5257void totemsrp_stats_clear (void *context, int flags)
5258{
5259 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5260
5261 memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5264 }
5265}
5266
5267void totemsrp_force_gather (void *context)
5268{
5269 timer_function_orf_token_timeout(context);
5270}
totem_configuration_type
The totem_configuration_type enum.
Definition coroapi.h:132
@ TOTEM_CONFIGURATION_REGULAR
Definition coroapi.h:133
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition coroapi.h:134
#define INTERFACE_MAX
Definition coroapi.h:88
#define MESSAGE_QUEUE_MAX
Definition coroapi.h:98
unsigned int nodeid
Definition coroapi.h:0
unsigned char addr[TOTEMIP_ADDRLEN]
Definition coroapi.h:2
totem_callback_token_type
The totem_callback_token_type enum.
Definition coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_SENT
Definition coroapi.h:144
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition coroapi.h:143
#define PROCESSOR_COUNT_MAX
Definition coroapi.h:96
#define CS_PRI_RING_ID_SEQ
Definition corotypes.h:61
#define CS_PRI_NODE_ID
Definition corotypes.h:59
#define CS_PRI_RING_ID
Definition corotypes.h:62
uint32_t flags
uint32_t value
icmap_map_t icmap_get_global_map(void)
Return global icmap.
Definition icmap.c:268
#define LOGSYS_LEVEL_DEBUG
Definition logsys.h:76
struct srp_addr addr
Definition totemsrp.c:164
int guarantee
Definition totemsrp.c:190
unsigned int node_id
Definition totemsrp.c:189
struct memb_ring_id ring_id
Definition totemsrp.c:188
struct totem_message_header header
Definition totemsrp.c:184
unsigned int seq
Definition totemsrp.c:186
int this_seqno
Definition totemsrp.c:187
struct srp_addr system_from
Definition totemsrp.c:185
Definition totemsrp.c:243
unsigned int aru
Definition totemsrp.c:245
unsigned int received_flg
Definition totemsrp.c:247
struct memb_ring_id ring_id
Definition totemsrp.c:244
unsigned int high_delivered
Definition totemsrp.c:246
unsigned int retrans_flg
Definition totemsrp.c:255
struct totem_message_header header
Definition totemsrp.c:252
unsigned char end_of_commit_token[0]
Definition totemsrp.c:258
unsigned int token_seq
Definition totemsrp.c:253
struct memb_ring_id ring_id
Definition totemsrp.c:254
struct srp_addr system_from
Definition totemsrp.c:217
struct totem_message_header header
Definition totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition totemsrp.c:221
unsigned long long ring_seq
Definition totemsrp.c:220
unsigned int failed_list_entries
Definition totemsrp.c:219
unsigned int proc_list_entries
Definition totemsrp.c:218
struct totem_message_header header
Definition totemsrp.c:231
struct memb_ring_id ring_id
Definition totemsrp.c:233
struct srp_addr system_from
Definition totemsrp.c:232
The memb_ring_id struct.
Definition coroapi.h:122
unsigned long long seq
Definition coroapi.h:124
unsigned int rep
Definition totem.h:150
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition totemsrp.c:535
unsigned int msg_len
Definition totemsrp.c:269
struct mcast * mcast
Definition totemsrp.c:268
unsigned int backlog
Definition totemsrp.c:207
unsigned int token_seq
Definition totemsrp.c:203
unsigned int aru_addr
Definition totemsrp.c:205
unsigned int fcc
Definition totemsrp.c:208
unsigned int aru
Definition totemsrp.c:204
int rtr_list_entries
Definition totemsrp.c:210
struct rtr_item rtr_list[0]
Definition totemsrp.c:211
int retrans_flg
Definition totemsrp.c:209
unsigned int seq
Definition totemsrp.c:202
struct totem_message_header header
Definition totemsrp.c:201
struct memb_ring_id ring_id
Definition totemsrp.c:206
struct memb_ring_id ring_id
Definition totemsrp.c:195
unsigned int seq
Definition totemsrp.c:196
unsigned int msg_len
Definition totemsrp.c:274
struct mcast * mcast
Definition totemsrp.c:273
The sq struct.
Definition sq.h:43
unsigned int nodeid
Definition totemsrp.c:108
struct qb_list_head list
Definition totemsrp.c:170
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition totemsrp.c:171
enum totem_callback_token_type callback_type
Definition totemsrp.c:172
struct totem_message_header header
Definition totemsrp.c:238
struct memb_ring_id ring_id
Definition totemsrp.c:239
unsigned int max_messages
Definition totem.h:220
unsigned int heartbeat_failures_allowed
Definition totem.h:214
unsigned int token_timeout
Definition totem.h:182
unsigned int window_size
Definition totem.h:218
struct totem_logging_configuration totem_logging_configuration
Definition totem.h:208
unsigned int downcheck_timeout
Definition totem.h:200
unsigned int miss_count_const
Definition totem.h:242
struct totem_interface * interfaces
Definition totem.h:165
unsigned int cancel_token_hold_on_retransmit
Definition totem.h:248
unsigned int fail_to_recv_const
Definition totem.h:202
unsigned int merge_timeout
Definition totem.h:198
struct totem_interface * orig_interfaces
Definition totem.h:166
unsigned int net_mtu
Definition totem.h:210
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:250
unsigned int token_retransmits_before_loss_const
Definition totem.h:190
unsigned int max_network_delay
Definition totem.h:216
unsigned int seqno_unchanged_const
Definition totem.h:204
unsigned int consensus_timeout
Definition totem.h:196
unsigned int threads
Definition totem.h:212
unsigned int send_join_timeout
Definition totem.h:194
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:254
unsigned int token_retransmit_timeout
Definition totem.h:186
unsigned int token_warning
Definition totem.h:184
unsigned int join_timeout
Definition totem.h:192
unsigned int token_hold_timeout
Definition totem.h:188
struct totem_ip_address boundto
Definition totem.h:84
uint8_t configured
Definition totem.h:89
int member_count
Definition totem.h:90
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition totem.h:97
struct totem_ip_address mcast_addr
Definition totem.h:85
The totem_ip_address struct.
Definition coroapi.h:111
unsigned int nodeid
Definition coroapi.h:112
unsigned short family
Definition coroapi.h:113
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition totem.h:101
void(*) in log_level_security)
Definition totem.h:110
unsigned int nodeid
Definition totem.h:131
unsigned short magic
Definition totem.h:127
struct totem_ip_address mcast_address
Definition totemsrp.c:452
totemsrp_stats_t stats
Definition totemsrp.c:516
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:320
int consensus_list_entries
Definition totemsrp.c:300
int my_merge_detect_timeout_outstanding
Definition totemsrp.c:346
unsigned int my_last_seq
Definition totemsrp.c:496
qb_loop_timer_handle timer_heartbeat_timeout
Definition totemsrp.c:419
unsigned int my_token_seq
Definition totemsrp.c:396
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition totemsrp.c:413
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:298
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition totemsrp.c:415
uint64_t pause_timestamp
Definition totemsrp.c:512
uint32_t threaded_mode_enabled
Definition totemsrp.c:522
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:316
void * totemnet_context
Definition totemsrp.c:500
int my_leave_memb_entries
Definition totemsrp.c:338
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:308
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:312
int my_failed_list_entries
Definition totemsrp.c:326
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:310
unsigned int use_heartbeat
Definition totemsrp.c:504
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:472
void(*) enum memb_stat memb_state)
Definition totemsrp.c:446
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition totemsrp.c:417
struct cs_queue new_message_queue
Definition totemsrp.c:371
int orf_token_retransmit_size
Definition totemsrp.c:394
unsigned int my_high_seq_received
Definition totemsrp.c:354
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition totemsrp.c:454
uint32_t orf_token_discard
Definition totemsrp.c:518
struct qb_list_head token_callback_sent_listhead
Definition totemsrp.c:390
unsigned int last_released
Definition totemsrp.c:486
unsigned int set_aru
Definition totemsrp.c:488
int totemsrp_log_level_notice
Definition totemsrp.c:430
struct cs_queue new_message_queue_trans
Definition totemsrp.c:373
int totemsrp_log_level_trace
Definition totemsrp.c:434
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition totemsrp.c:392
unsigned int my_trc
Definition totemsrp.c:506
struct cs_queue retrans_message_queue
Definition totemsrp.c:375
struct memb_ring_id my_ring_id
Definition totemsrp.c:340
int totemsrp_log_level_error
Definition totemsrp.c:426
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition totemsrp.c:469
unsigned int old_ring_state_high_seq_received
Definition totemsrp.c:494
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition totemsrp.c:409
qb_loop_timer_handle timer_pause_timeout
Definition totemsrp.c:401
unsigned int my_high_ring_delivered
Definition totemsrp.c:364
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition totemsrp.c:407
struct totem_config * totem_config
Definition totemsrp.c:502
int my_deliver_memb_entries
Definition totemsrp.c:334
void(* totemsrp_service_ready_fn)(void)
Definition totemsrp.c:467
int my_trans_memb_entries
Definition totemsrp.c:330
uint32_t originated_orf_token
Definition totemsrp.c:520
void * token_recv_event_handle
Definition totemsrp.c:528
struct sq recovery_sort_queue
Definition totemsrp.c:379
qb_loop_timer_handle timer_orf_token_timeout
Definition totemsrp.c:403
qb_loop_timer_handle timer_merge_detect_timeout
Definition totemsrp.c:411
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:322
void * token_sent_event_handle
Definition totemsrp.c:529
unsigned int my_high_delivered
Definition totemsrp.c:386
int totemsrp_log_level_security
Definition totemsrp.c:424
int totemsrp_log_level_warning
Definition totemsrp.c:428
struct memb_commit_token * commit_token
Definition totemsrp.c:514
char commit_token_storage[40000]
Definition totemsrp.c:530
struct memb_ring_id my_old_ring_id
Definition totemsrp.c:342
struct timeval tv_old
Definition totemsrp.c:498
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:476
qb_loop_t * totemsrp_poll_handle
Definition totemsrp.c:450
unsigned int my_install_seq
Definition totemsrp.c:356
qb_loop_timer_handle timer_orf_token_warning
Definition totemsrp.c:405
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:314
struct srp_addr my_id
Definition totemsrp.c:304
unsigned int my_cbl
Definition totemsrp.c:510
struct qb_list_head token_callback_received_listhead
Definition totemsrp.c:388
unsigned int my_last_aru
Definition totemsrp.c:348
unsigned int my_aru
Definition totemsrp.c:384
uint32_t waiting_trans_ack
Definition totemsrp.c:524
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition totemsrp.c:438
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition totemsrp.c:460
struct sq regular_sort_queue
Definition totemsrp.c:377
unsigned long long token_ring_id_seq
Definition totemsrp.c:484
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:318
int totemsrp_log_level_debug
Definition totemsrp.c:432
unsigned int my_pbl
Definition totemsrp.c:508
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition totemsrp.c:306
uint64_t memb_join_tx
Definition totemstats.h:59
uint32_t continuous_gather
Definition totemstats.h:78
uint64_t recovery_entered
Definition totemstats.h:74
uint64_t rx_msg_dropped
Definition totemstats.h:77
uint64_t gather_entered
Definition totemstats.h:70
uint64_t memb_commit_token_rx
Definition totemstats.h:65
uint64_t mcast_retx
Definition totemstats.h:62
uint64_t mcast_tx
Definition totemstats.h:61
uint64_t memb_commit_token_tx
Definition totemstats.h:64
uint64_t operational_token_lost
Definition totemstats.h:69
uint64_t operational_entered
Definition totemstats.h:68
uint64_t gather_token_lost
Definition totemstats.h:71
uint64_t commit_token_lost
Definition totemstats.h:73
uint64_t token_hold_cancel_tx
Definition totemstats.h:66
uint64_t orf_token_rx
Definition totemstats.h:56
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition totemstats.h:90
uint64_t recovery_token_lost
Definition totemstats.h:75
uint64_t commit_entered
Definition totemstats.h:72
uint64_t memb_merge_detect_rx
Definition totemstats.h:58
uint64_t memb_join_rx
Definition totemstats.h:60
uint64_t orf_token_tx
Definition totemstats.h:55
uint64_t memb_merge_detect_tx
Definition totemstats.h:57
uint64_t mcast_rx
Definition totemstats.h:63
uint64_t token_hold_cancel_rx
Definition totemstats.h:67
uint64_t consensus_timeouts
Definition totemstats.h:76
#define swab64(x)
The swab64 macro.
Definition swab.h:65
#define swab16(x)
The swab16 macro.
Definition swab.h:39
#define swab32(x)
The swab32 macro.
Definition swab.h:51
totem_event_type
Definition totem.h:290
#define TOTEM_MH_VERSION
Definition totem.h:124
#define FRAME_SIZE_MAX
Definition totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition totem.h:154
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
Definition totem.h:264
#define TOTEM_MH_MAGIC
Definition totem.h:123
char type
Definition totem.h:2
int totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
const char * totemip_sa_print(const struct sockaddr *sa)
Definition totemip.c:234
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition totemip.c:123
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, int(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), int(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition totemnet.c:317
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemnet.c:471
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:553
void * totemnet_buffer_alloc(void *net_context)
Definition totemnet.c:367
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:414
int totemnet_send_flush(void *net_context)
Definition totemnet.c:404
void totemnet_buffer_release(void *net_context, void *ptr)
Definition totemnet.c:375
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:426
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:533
int totemnet_finalize(void *net_context)
Definition totemnet.c:306
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition totemnet.c:292
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition totemnet.c:497
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition totemnet.c:383
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition totemnet.c:510
int totemnet_recv_flush(void *net_context)
Definition totemnet.c:394
int totemnet_iface_check(void *net_context)
Definition totemnet.c:452
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:439
int totemnet_recv_mcast_empty(void *net_context)
Definition totemnet.c:522
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemnet.c:484
void totemnet_stats_clear(void *net_context)
Definition totemnet.c:619
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition totemnet.c:589
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemnet.c:603
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
Definition totemsrp.c:1133
#define SEQNO_START_TOKEN
Definition totemsrp.c:122
int main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition totemsrp.c:5125
unsigned long long ring_seq
Definition totemsrp.c:4
#define RETRANSMIT_ENTRIES_MAX
Definition totemsrp.c:100
unsigned int seq
Definition totemsrp.c:2
#define log_printf(level, format, args...)
Definition totemsrp.c:690
void totemsrp_force_gather(void *context)
Definition totemsrp.c:5267
int rtr_list_entries
Definition totemsrp.c:9
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition totemsrp.c:5188
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition totemsrp.c:818
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition totemsrp.c:3496
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition totemsrp.c:97
void totemsrp_threaded_mode_enable(void *context)
Definition totemsrp.c:5223
struct rtr_item rtr_list[0]
Definition totemsrp.c:10
message_type
Definition totemsrp.c:146
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition totemsrp.c:151
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition totemsrp.c:152
@ MESSAGE_TYPE_ORF_TOKEN
Definition totemsrp.c:147
@ MESSAGE_TYPE_MEMB_JOIN
Definition totemsrp.c:150
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition totemsrp.c:149
@ MESSAGE_TYPE_MCAST
Definition totemsrp.c:148
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition totemsrp.c:5184
#define TOKEN_SIZE_MAX
Definition totemsrp.c:101
encapsulation_type
Definition totemsrp.c:155
@ MESSAGE_NOT_ENCAPSULATED
Definition totemsrp.c:157
@ MESSAGE_ENCAPSULATED
Definition totemsrp.c:156
unsigned int failed_list_entries
Definition totemsrp.c:3
struct message_handlers totemsrp_message_handlers
Definition totemsrp.c:678
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemsrp.c:1041
#define LEAVE_DUMMY_NODEID
Definition totemsrp.c:102
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition totemsrp.c:96
int guarantee
Definition totemsrp.c:6
unsigned int aru
Definition totemsrp.c:3
gather_state_from
Definition totemsrp.c:542
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition totemsrp.c:546
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition totemsrp.c:551
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition totemsrp.c:549
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition totemsrp.c:543
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition totemsrp.c:552
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition totemsrp.c:545
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition totemsrp.c:554
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition totemsrp.c:558
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition totemsrp.c:544
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition totemsrp.c:547
@ TOTEMSRP_GSFROM_MAX
Definition totemsrp.c:559
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition totemsrp.c:556
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition totemsrp.c:548
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition totemsrp.c:550
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition totemsrp.c:555
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition totemsrp.c:553
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition totemsrp.c:557
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition totemsrp.c:1108
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition totemsrp.c:2568
void totemsrp_stats_clear(void *context, int flags)
Definition totemsrp.c:5257
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemsrp.c:5104
void totemsrp_finalize(void *srp_context)
Definition totemsrp.c:1026
struct memb_ring_id ring_id
Definition totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition totemsrp.c:5230
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemsrp.c:5248
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition totemsrp.c:1122
int addr_entries
Definition totemsrp.c:5
unsigned int backlog
Definition totemsrp.c:6
#define SEQNO_START_MSG
Definition totemsrp.c:121
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition totemsrp.c:2488
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition totemsrp.c:3531
unsigned int received_flg
Definition totemsrp.c:3
struct message_item __attribute__
int main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition totemsrp.c:5053
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5197
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition totemsrp.c:1070
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition totemsrp.c:5239
unsigned int high_delivered
Definition totemsrp.c:2
struct srp_addr system_from
Definition totemsrp.c:1
unsigned int proc_list_entries
Definition totemsrp.c:2
const char * gather_state_from_desc[]
Definition totemsrp.c:562
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5210
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition totemsrp.c:2497
memb_state
Definition totemsrp.c:277
@ MEMB_STATE_GATHER
Definition totemsrp.c:279
@ MEMB_STATE_RECOVERY
Definition totemsrp.c:281
@ MEMB_STATE_COMMIT
Definition totemsrp.c:280
@ MEMB_STATE_OPERATIONAL
Definition totemsrp.c:278
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition totemstats.h:116
#define TOTEM_TOKEN_STATS_MAX
Definition totemstats.h:89