corosync 3.1.7
totemsrp.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2018 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * The first version of this code was based upon Yair Amir's PhD thesis:
38 * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39 *
40 * The current version of totemsrp implements the Totem protocol specified in:
41 * http://citeseer.ist.psu.edu/amir95totem.html
42 *
43 * The deviations from the above published protocols are:
44 * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45 * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46 */
47
48#include <config.h>
49
50#include <assert.h>
51#ifdef HAVE_ALLOCA_H
52#include <alloca.h>
53#endif
54#include <sys/mman.h>
55#include <sys/types.h>
56#include <sys/stat.h>
57#include <sys/socket.h>
58#include <netdb.h>
59#include <sys/un.h>
60#include <sys/ioctl.h>
61#include <sys/param.h>
62#include <netinet/in.h>
63#include <arpa/inet.h>
64#include <unistd.h>
65#include <fcntl.h>
66#include <stdlib.h>
67#include <stdio.h>
68#include <errno.h>
69#include <sched.h>
70#include <time.h>
71#include <sys/time.h>
72#include <sys/poll.h>
73#include <sys/uio.h>
74#include <limits.h>
75
76#include <qb/qblist.h>
77#include <qb/qbdefs.h>
78#include <qb/qbutil.h>
79#include <qb/qbloop.h>
80
81#include <corosync/swab.h>
82#include <corosync/sq.h>
83
84#define LOGSYS_UTILS_ONLY 1
85#include <corosync/logsys.h>
86
87#include "totemsrp.h"
88#include "totemnet.h"
89
90#include "icmap.h"
91#include "totemconfig.h"
92
93#include "cs_queue.h"
94
95#define LOCALHOST_IP inet_addr("127.0.0.1")
96#define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99#define MAXIOVS 5
100#define RETRANSMIT_ENTRIES_MAX 30
101#define TOKEN_SIZE_MAX 64000 /* bytes */
102#define LEAVE_DUMMY_NODEID 0
103
104/*
105 * SRP address.
106 */
107struct srp_addr {
108 unsigned int nodeid;
109};
110
111/*
112 * Rollover handling:
113 * SEQNO_START_MSG is the starting sequence number after a new configuration
114 * This should remain zero, unless testing overflow in which case
115 * 0x7ffff000 and 0xfffff000 are good starting values.
116 *
117 * SEQNO_START_TOKEN is the starting sequence number after a new configuration
118 * for a token. This should remain zero, unless testing overflow in which
119 * case 07fffff00 or 0xffffff00 are good starting values.
120 */
121#define SEQNO_START_MSG 0x0
122#define SEQNO_START_TOKEN 0x0
123
124/*
125 * These can be used ot test different rollover points
126 * #define SEQNO_START_MSG 0xfffffe00
127 * #define SEQNO_START_TOKEN 0xfffffe00
128 */
129
130/*
131 * These can be used to test the error recovery algorithms
132 * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
133 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
134 * #define TEST_DROP_MCAST_PERCENTAGE 50
135 * #define TEST_RECOVERY_MSG_COUNT 300
136 */
137
138/*
139 * we compare incoming messages to determine if their endian is
140 * different - if so convert them
141 *
142 * do not change
143 */
144#define ENDIAN_LOCAL 0xff22
145
147 MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
148 MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
149 MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
150 MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
151 MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
152 MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
153};
154
159
160/*
161 * New membership algorithm local variables
162 */
165 int set;
166};
167
168
170 struct qb_list_head list;
171 int (*callback_fn) (enum totem_callback_token_type type, const void *);
173 int delete;
174 void *data;
175};
176
177
179 int mcast;
180 int token;
181};
182
183struct mcast {
186 unsigned int seq;
189 unsigned int node_id;
191} __attribute__((packed));
192
193
194struct rtr_item {
196 unsigned int seq;
197}__attribute__((packed));
198
199
200struct orf_token {
202 unsigned int seq;
203 unsigned int token_seq;
204 unsigned int aru;
205 unsigned int aru_addr;
207 unsigned int backlog;
208 unsigned int fcc;
212}__attribute__((packed));
213
214
215struct memb_join {
218 unsigned int proc_list_entries;
220 unsigned long long ring_seq;
221 unsigned char end_of_memb_join[0];
222/*
223 * These parts of the data structure are dynamic:
224 * struct srp_addr proc_list[];
225 * struct srp_addr failed_list[];
226 */
227} __attribute__((packed));
228
229
234} __attribute__((packed));
235
236
240} __attribute__((packed));
241
242
245 unsigned int aru;
246 unsigned int high_delivered;
247 unsigned int received_flg;
248}__attribute__((packed));
249
250
253 unsigned int token_seq;
255 unsigned int retrans_flg;
258 unsigned char end_of_commit_token[0];
259/*
260 * These parts of the data structure are dynamic:
261 *
262 * struct srp_addr addr[PROCESSOR_COUNT_MAX];
263 * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
264 */
265}__attribute__((packed));
266
268 struct mcast *mcast;
269 unsigned int msg_len;
271
273 struct mcast *mcast;
274 unsigned int msg_len;
275};
276
283
286
288
289 /*
290 * Flow control mcasts and remcasts on last and current orf_token
291 */
293
295
297
299
301
303
305
307
309
311
313
315
317
319
321
323
325
327
329
331
333
335
337
339
341
343
345
347
348 unsigned int my_last_aru;
349
351
353
355
356 unsigned int my_install_seq;
357
359
361
363
365
367
368 /*
369 * Queues used to order, deliver, and recover messages
370 */
372
374
376
378
380
381 /*
382 * Received up to and including
383 */
384 unsigned int my_aru;
385
386 unsigned int my_high_delivered;
387
389
390 struct qb_list_head token_callback_sent_listhead;
391
393
395
396 unsigned int my_token_seq;
397
398 /*
399 * Timers
400 */
401 qb_loop_timer_handle timer_pause_timeout;
402
403 qb_loop_timer_handle timer_orf_token_timeout;
404
405 qb_loop_timer_handle timer_orf_token_warning;
406
408
410
411 qb_loop_timer_handle timer_merge_detect_timeout;
412
414
416
418
419 qb_loop_timer_handle timer_heartbeat_timeout;
420
421 /*
422 * Function and data used to log messages
423 */
425
427
429
431
433
435
437
439 int level,
440 int subsys,
441 const char *function,
442 const char *file,
443 int line,
444 const char *format, ...)__attribute__((format(printf, 6, 7)));;
445
447
448//TODO struct srp_addr next_memb;
449
451
453
455 unsigned int nodeid,
456 const void *msg,
457 unsigned int msg_len,
458 int endian_conversion_required);
459
461 enum totem_configuration_type configuration_type,
462 const unsigned int *member_list, size_t member_list_entries,
463 const unsigned int *left_list, size_t left_list_entries,
464 const unsigned int *joined_list, size_t joined_list_entries,
465 const struct memb_ring_id *ring_id);
466
468
471
474 unsigned int nodeid);
475
477 const struct memb_ring_id *memb_ring_id,
478 unsigned int nodeid);
479
481
483
484 unsigned long long token_ring_id_seq;
485
486 unsigned int last_released;
487
488 unsigned int set_aru;
489
491
493
495
496 unsigned int my_last_seq;
497
498 struct timeval tv_old;
499
501
503
504 unsigned int use_heartbeat;
505
506 unsigned int my_trc;
507
508 unsigned int my_pbl;
509
510 unsigned int my_cbl;
511
513
515
517
519
521
523
525
527
531};
532
534 int count;
536 struct totemsrp_instance *instance,
537 const void *msg,
538 size_t msg_len,
539 int endian_conversion_needed);
540};
541
560};
561
562const char* gather_state_from_desc [] = {
563 [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
565 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
566 [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
567 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
568 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
569 [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
570 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
571 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
572 [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
573 [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
574 [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
575 [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
576 [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
577 [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
578 [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
579};
580
581/*
582 * forward decls
583 */
584static int message_handler_orf_token (
585 struct totemsrp_instance *instance,
586 const void *msg,
587 size_t msg_len,
588 int endian_conversion_needed);
589
590static int message_handler_mcast (
591 struct totemsrp_instance *instance,
592 const void *msg,
593 size_t msg_len,
594 int endian_conversion_needed);
595
596static int message_handler_memb_merge_detect (
597 struct totemsrp_instance *instance,
598 const void *msg,
599 size_t msg_len,
600 int endian_conversion_needed);
601
602static int message_handler_memb_join (
603 struct totemsrp_instance *instance,
604 const void *msg,
605 size_t msg_len,
606 int endian_conversion_needed);
607
608static int message_handler_memb_commit_token (
609 struct totemsrp_instance *instance,
610 const void *msg,
611 size_t msg_len,
612 int endian_conversion_needed);
613
614static int message_handler_token_hold_cancel (
615 struct totemsrp_instance *instance,
616 const void *msg,
617 size_t msg_len,
618 int endian_conversion_needed);
619
620static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
621
622static void srp_addr_to_nodeid (
623 struct totemsrp_instance *instance,
624 unsigned int *nodeid_out,
625 struct srp_addr *srp_addr_in,
626 unsigned int entries);
627
628static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
629
630static void memb_leave_message_send (struct totemsrp_instance *instance);
631
632static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
633static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
634static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
635static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
636 int fcc_mcasts_allowed);
637static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
638
639static void memb_ring_id_set (struct totemsrp_instance *instance,
640 const struct memb_ring_id *ring_id);
641static void target_set_completed (void *context);
642static void memb_state_commit_token_update (struct totemsrp_instance *instance);
643static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
644static int memb_state_commit_token_send (struct totemsrp_instance *instance);
645static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
646static void memb_state_commit_token_create (struct totemsrp_instance *instance);
647static int token_hold_cancel_send (struct totemsrp_instance *instance);
648static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
649static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
650static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
651static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
652static void memb_merge_detect_endian_convert (
653 const struct memb_merge_detect *in,
654 struct memb_merge_detect *out);
655static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
656static void timer_function_orf_token_timeout (void *data);
657static void timer_function_orf_token_warning (void *data);
658static void timer_function_pause_timeout (void *data);
659static void timer_function_heartbeat_timeout (void *data);
660static void timer_function_token_retransmit_timeout (void *data);
661static void timer_function_token_hold_retransmit_timeout (void *data);
662static void timer_function_merge_detect_timeout (void *data);
663static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
664static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
665static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
666
667void main_deliver_fn (
668 void *context,
669 const void *msg,
670 unsigned int msg_len,
671 const struct sockaddr_storage *system_from);
672
674 void *context,
675 const struct totem_ip_address *iface_address,
676 unsigned int iface_no);
677
679 6,
680 {
681 message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
682 message_handler_mcast, /* MESSAGE_TYPE_MCAST */
683 message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
684 message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
685 message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
686 message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
687 }
688};
689
690#define log_printf(level, format, args...) \
691do { \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
695 format, ##args); \
696} while (0);
697#define LOGSYS_PERROR(err_num, level, fmt, args...) \
698do { \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
705 } while(0)
706
707static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
708{
709 if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
710 return gather_state_from_desc[gsfrom];
711 }
712 else {
713 return "UNKNOWN";
714 }
715}
716
717static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
718{
719 memset (instance, 0, sizeof (struct totemsrp_instance));
720
721 qb_list_init (&instance->token_callback_received_listhead);
722
723 qb_list_init (&instance->token_callback_sent_listhead);
724
725 instance->my_received_flg = 1;
726
727 instance->my_token_seq = SEQNO_START_TOKEN - 1;
728
730
731 instance->set_aru = -1;
732
733 instance->my_aru = SEQNO_START_MSG;
734
736
738
739 instance->orf_token_discard = 0;
740
741 instance->originated_orf_token = 0;
742
743 instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
744
745 instance->waiting_trans_ack = 1;
746}
747
748static int pause_flush (struct totemsrp_instance *instance)
749{
750 uint64_t now_msec;
751 uint64_t timestamp_msec;
752 int res = 0;
753
754 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
755 timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
756
757 if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
759 "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
760 /*
761 * -1 indicates an error from recvmsg
762 */
763 do {
765 } while (res == -1);
766 }
767 return (res);
768}
769
770static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
771{
772 struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
773 uint32_t time_now;
774 unsigned long long nano_secs = qb_util_nano_current_get ();
775
776 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
777
779 /* incr latest token the index */
780 if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
781 instance->stats.latest_token = 0;
782 else
783 instance->stats.latest_token++;
784
785 if (instance->stats.earliest_token == instance->stats.latest_token) {
786 /* we have filled up the array, start overwriting */
787 if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
788 instance->stats.earliest_token = 0;
789 else
790 instance->stats.earliest_token++;
791
792 instance->stats.token[instance->stats.earliest_token].rx = 0;
793 instance->stats.token[instance->stats.earliest_token].tx = 0;
794 instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
795 }
796
797 instance->stats.token[instance->stats.latest_token].rx = time_now;
798 instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
799 } else {
800 instance->stats.token[instance->stats.latest_token].tx = time_now;
801 }
802 return 0;
803}
804
805static void totempg_mtu_changed(void *context, int net_mtu)
806{
807 struct totemsrp_instance *instance = context;
808
809 instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
810
812 "Net MTU changed to %d, new value is %d",
813 net_mtu, instance->totem_config->net_mtu);
814}
815
816/*
817 * Exported interfaces
818 */
820 qb_loop_t *poll_handle,
821 void **srp_context,
823 totempg_stats_t *stats,
824
825 void (*deliver_fn) (
826 unsigned int nodeid,
827 const void *msg,
828 unsigned int msg_len,
829 int endian_conversion_required),
830
831 void (*confchg_fn) (
832 enum totem_configuration_type configuration_type,
833 const unsigned int *member_list, size_t member_list_entries,
834 const unsigned int *left_list, size_t left_list_entries,
835 const unsigned int *joined_list, size_t joined_list_entries,
836 const struct memb_ring_id *ring_id),
837 void (*waiting_trans_ack_cb_fn) (
838 int waiting_trans_ack))
839{
840 struct totemsrp_instance *instance;
841 int res;
842
843 instance = malloc (sizeof (struct totemsrp_instance));
844 if (instance == NULL) {
845 goto error_exit;
846 }
847
848 totemsrp_instance_initialize (instance);
849
850 instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
852
853 stats->srp = &instance->stats;
854 instance->stats.latest_token = 0;
855 instance->stats.earliest_token = 0;
856
857 instance->totem_config = totem_config;
858
859 /*
860 * Configure logging
861 */
870
871 /*
872 * Configure totem store and load functions
873 */
876
877 /*
878 * Initialize local variables for totemsrp
879 */
881
882 /*
883 * Display totem configuration
884 */
886 "Token Timeout (%d ms) retransmit timeout (%d ms)",
889 uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
891 "Token warning every %d ms (%d%% of Token Timeout)",
892 token_warning_ms, totem_config->token_warning);
893 if (token_warning_ms < totem_config->token_retransmit_timeout)
895 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
896 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
897 token_warning_ms, totem_config->token_retransmit_timeout);
898 } else {
900 "Token warnings disabled");
901 }
903 "token hold (%d ms) retransmits before loss (%d retrans)",
906 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
910
913 "downcheck (%d ms) fail to recv const (%d msgs)",
916 "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
917
919 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
921
923 "missed count const (%d messages)",
925
927 "send threads (%d threads)", totem_config->threads);
928
930 "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
932 "max_network_delay (%d ms)", totem_config->max_network_delay);
933
934
935 cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
936 sizeof (struct message_item), instance->threaded_mode_enabled);
937
938 sq_init (&instance->regular_sort_queue,
939 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
940
941 sq_init (&instance->recovery_sort_queue,
942 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
943
944 instance->totemsrp_poll_handle = poll_handle;
945
946 instance->totemsrp_deliver_fn = deliver_fn;
947
948 instance->totemsrp_confchg_fn = confchg_fn;
949 instance->use_heartbeat = 1;
950
951 timer_function_pause_timeout (instance);
952
955 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
956 instance->use_heartbeat = 0;
957 }
958
959 if (instance->use_heartbeat) {
960 instance->heartbeat_timeout
963
964 if (instance->heartbeat_timeout >= totem_config->token_timeout) {
966 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
967 instance->heartbeat_timeout,
970 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
972 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
973 instance->use_heartbeat = 0;
974 }
975 else {
977 "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
978 }
979 }
980
981 res = totemnet_initialize (
982 poll_handle,
983 &instance->totemnet_context,
985 stats->srp,
986 instance,
989 totempg_mtu_changed,
990 target_set_completed);
991 if (res == -1) {
992 goto error_exit;
993 }
994
995 instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
996
997 /*
998 * Must have net_mtu adjusted by totemnet_initialize first
999 */
1000 cs_queue_init (&instance->new_message_queue,
1002 sizeof (struct message_item), instance->threaded_mode_enabled);
1003
1004 cs_queue_init (&instance->new_message_queue_trans,
1006 sizeof (struct message_item), instance->threaded_mode_enabled);
1007
1009 &instance->token_recv_event_handle,
1011 0,
1012 token_event_stats_collector,
1013 instance);
1015 &instance->token_sent_event_handle,
1017 0,
1018 token_event_stats_collector,
1019 instance);
1020 *srp_context = instance;
1021 return (0);
1022
1023error_exit:
1024 return (-1);
1025}
1026
1028 void *srp_context)
1029{
1030 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1031
1032 memb_leave_message_send (instance);
1034 cs_queue_free (&instance->new_message_queue);
1035 cs_queue_free (&instance->new_message_queue_trans);
1036 cs_queue_free (&instance->retrans_message_queue);
1037 sq_free (&instance->regular_sort_queue);
1038 sq_free (&instance->recovery_sort_queue);
1039 free (instance);
1040}
1041
1043 void *srp_context,
1044 unsigned int nodeid,
1045 struct totem_node_status *node_status)
1046{
1047 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1048 int i;
1049
1051
1052 /* Fill in 'reachable' here as the lower level UDP[u] layers don't know */
1053 for (i = 0; i < instance->my_proc_list_entries; i++) {
1054 if (instance->my_proc_list[i].nodeid == nodeid) {
1055 node_status->reachable = 1;
1056 }
1057 }
1058
1059 return totemnet_nodestatus_get(instance->totemnet_context, nodeid, node_status);
1060}
1061
1062
1063/*
1064 * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1065 * with interaces_size number of items. iface_count is final number of interfaces filled by this
1066 * function.
1067 *
1068 * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1069 * and if interface was not found, -1 is returned.
1070 */
1072 void *srp_context,
1073 unsigned int nodeid,
1074 unsigned int *interface_id,
1075 struct totem_ip_address *interfaces,
1076 unsigned int interfaces_size,
1077 char ***status,
1078 unsigned int *iface_count)
1079{
1080 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1081 struct totem_ip_address *iface_ptr = interfaces;
1082 int res = 0;
1083 int i,n;
1084 int num_ifs = 0;
1085
1086 memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1087 *iface_count = INTERFACE_MAX;
1088
1089 for (i=0; i<INTERFACE_MAX; i++) {
1090 for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1091 if (instance->totem_config->interfaces[i].configured &&
1092 instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1093 memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1094 interface_id[num_ifs] = i;
1095 iface_ptr++;
1096 if (++num_ifs > interfaces_size) {
1097 res = -2;
1098 break;
1099 }
1100 }
1101 }
1102 }
1103
1104 totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1105 *iface_count = num_ifs;
1106 return (res);
1107}
1108
1110 void *srp_context,
1111 const char *cipher_type,
1112 const char *hash_type)
1113{
1114 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1115 int res;
1116
1117 res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1118
1119 return (res);
1120}
1121
1122
1124 void *srp_context)
1125{
1126 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1127 unsigned int res;
1128
1129 res = instance->my_id.nodeid;
1130
1131 return (res);
1132}
1133
1135 void *srp_context)
1136{
1137 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1138 int res;
1139
1140 res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1141
1142 return (res);
1143}
1144
1145
1146/*
1147 * Set operations for use by the membership algorithm
1148 */
1149static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1150{
1151 if (a->nodeid == b->nodeid) {
1152 return 1;
1153 }
1154 return 0;
1155}
1156
1157static void srp_addr_to_nodeid (
1158 struct totemsrp_instance *instance,
1159 unsigned int *nodeid_out,
1160 struct srp_addr *srp_addr_in,
1161 unsigned int entries)
1162{
1163 unsigned int i;
1164
1165 for (i = 0; i < entries; i++) {
1166 nodeid_out[i] = srp_addr_in[i].nodeid;
1167 }
1168}
1169
1170static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1171{
1172 struct srp_addr res;
1173
1174 res.nodeid = swab32 (in.nodeid);
1175
1176 return (res);
1177}
1178
1179static void memb_consensus_reset (struct totemsrp_instance *instance)
1180{
1181 instance->consensus_list_entries = 0;
1182}
1183
1184static void memb_set_subtract (
1185 struct srp_addr *out_list, int *out_list_entries,
1186 struct srp_addr *one_list, int one_list_entries,
1187 struct srp_addr *two_list, int two_list_entries)
1188{
1189 int found = 0;
1190 int i;
1191 int j;
1192
1193 *out_list_entries = 0;
1194
1195 for (i = 0; i < one_list_entries; i++) {
1196 for (j = 0; j < two_list_entries; j++) {
1197 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1198 found = 1;
1199 break;
1200 }
1201 }
1202 if (found == 0) {
1203 out_list[*out_list_entries] = one_list[i];
1204 *out_list_entries = *out_list_entries + 1;
1205 }
1206 found = 0;
1207 }
1208}
1209
1210/*
1211 * Set consensus for a specific processor
1212 */
1213static void memb_consensus_set (
1214 struct totemsrp_instance *instance,
1215 const struct srp_addr *addr)
1216{
1217 int found = 0;
1218 int i;
1219
1220 for (i = 0; i < instance->consensus_list_entries; i++) {
1221 if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1222 found = 1;
1223 break; /* found entry */
1224 }
1225 }
1226 instance->consensus_list[i].addr = *addr;
1227 instance->consensus_list[i].set = 1;
1228 if (found == 0) {
1229 instance->consensus_list_entries++;
1230 }
1231 return;
1232}
1233
1234/*
1235 * Is consensus set for a specific processor
1236 */
1237static int memb_consensus_isset (
1238 struct totemsrp_instance *instance,
1239 const struct srp_addr *addr)
1240{
1241 int i;
1242
1243 for (i = 0; i < instance->consensus_list_entries; i++) {
1244 if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1245 return (instance->consensus_list[i].set);
1246 }
1247 }
1248 return (0);
1249}
1250
1251/*
1252 * Is consensus agreed upon based upon consensus database
1253 */
1254static int memb_consensus_agreed (
1255 struct totemsrp_instance *instance)
1256{
1257 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1258 int token_memb_entries = 0;
1259 int agreed = 1;
1260 int i;
1261
1262 memb_set_subtract (token_memb, &token_memb_entries,
1263 instance->my_proc_list, instance->my_proc_list_entries,
1264 instance->my_failed_list, instance->my_failed_list_entries);
1265
1266 for (i = 0; i < token_memb_entries; i++) {
1267 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1268 agreed = 0;
1269 break;
1270 }
1271 }
1272
1273 if (agreed && instance->failed_to_recv == 1) {
1274 /*
1275 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1276 * will create single ring anyway.
1277 */
1278
1279 return (agreed);
1280 }
1281
1282 assert (token_memb_entries >= 1);
1283
1284 return (agreed);
1285}
1286
1287static void memb_consensus_notset (
1288 struct totemsrp_instance *instance,
1289 struct srp_addr *no_consensus_list,
1290 int *no_consensus_list_entries,
1291 struct srp_addr *comparison_list,
1292 int comparison_list_entries)
1293{
1294 int i;
1295
1296 *no_consensus_list_entries = 0;
1297
1298 for (i = 0; i < instance->my_proc_list_entries; i++) {
1299 if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1300 no_consensus_list[*no_consensus_list_entries] = instance->my_proc_list[i];
1301 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1302 }
1303 }
1304}
1305
1306/*
1307 * Is set1 equal to set2 Entries can be in different orders
1308 */
1309static int memb_set_equal (
1310 struct srp_addr *set1, int set1_entries,
1311 struct srp_addr *set2, int set2_entries)
1312{
1313 int i;
1314 int j;
1315
1316 int found = 0;
1317
1318 if (set1_entries != set2_entries) {
1319 return (0);
1320 }
1321 for (i = 0; i < set2_entries; i++) {
1322 for (j = 0; j < set1_entries; j++) {
1323 if (srp_addr_equal (&set1[j], &set2[i])) {
1324 found = 1;
1325 break;
1326 }
1327 }
1328 if (found == 0) {
1329 return (0);
1330 }
1331 found = 0;
1332 }
1333 return (1);
1334}
1335
1336/*
1337 * Is subset fully contained in fullset
1338 */
1339static int memb_set_subset (
1340 const struct srp_addr *subset, int subset_entries,
1341 const struct srp_addr *fullset, int fullset_entries)
1342{
1343 int i;
1344 int j;
1345 int found = 0;
1346
1347 if (subset_entries > fullset_entries) {
1348 return (0);
1349 }
1350 for (i = 0; i < subset_entries; i++) {
1351 for (j = 0; j < fullset_entries; j++) {
1352 if (srp_addr_equal (&subset[i], &fullset[j])) {
1353 found = 1;
1354 }
1355 }
1356 if (found == 0) {
1357 return (0);
1358 }
1359 found = 0;
1360 }
1361 return (1);
1362}
1363/*
1364 * merge subset into fullset taking care not to add duplicates
1365 */
1366static void memb_set_merge (
1367 const struct srp_addr *subset, int subset_entries,
1368 struct srp_addr *fullset, int *fullset_entries)
1369{
1370 int found = 0;
1371 int i;
1372 int j;
1373
1374 for (i = 0; i < subset_entries; i++) {
1375 for (j = 0; j < *fullset_entries; j++) {
1376 if (srp_addr_equal (&fullset[j], &subset[i])) {
1377 found = 1;
1378 break;
1379 }
1380 }
1381 if (found == 0) {
1382 fullset[*fullset_entries] = subset[i];
1383 *fullset_entries = *fullset_entries + 1;
1384 }
1385 found = 0;
1386 }
1387 return;
1388}
1389
1390static void memb_set_and_with_ring_id (
1391 struct srp_addr *set1,
1392 struct memb_ring_id *set1_ring_ids,
1393 int set1_entries,
1394 struct srp_addr *set2,
1395 int set2_entries,
1396 struct memb_ring_id *old_ring_id,
1397 struct srp_addr *and,
1398 int *and_entries)
1399{
1400 int i;
1401 int j;
1402 int found = 0;
1403
1404 *and_entries = 0;
1405
1406 for (i = 0; i < set2_entries; i++) {
1407 for (j = 0; j < set1_entries; j++) {
1408 if (srp_addr_equal (&set1[j], &set2[i])) {
1409 if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1410 found = 1;
1411 }
1412 break;
1413 }
1414 }
1415 if (found) {
1416 and[*and_entries] = set1[j];
1417 *and_entries = *and_entries + 1;
1418 }
1419 found = 0;
1420 }
1421 return;
1422}
1423
1424static void memb_set_log(
1425 struct totemsrp_instance *instance,
1426 int level,
1427 const char *string,
1428 struct srp_addr *list,
1429 int list_entries)
1430{
1431 char int_buf[32];
1432 char list_str[512];
1433 int i;
1434
1435 memset(list_str, 0, sizeof(list_str));
1436
1437 for (i = 0; i < list_entries; i++) {
1438 if (i == 0) {
1439 snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1440 } else {
1441 snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1442 }
1443
1444 if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1445 break ;
1446 }
1447 strcat(list_str, int_buf);
1448 }
1449
1450 log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1451}
1452
1453static void my_leave_memb_clear(
1454 struct totemsrp_instance *instance)
1455{
1456 memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1457 instance->my_leave_memb_entries = 0;
1458}
1459
1460static unsigned int my_leave_memb_match(
1461 struct totemsrp_instance *instance,
1462 unsigned int nodeid)
1463{
1464 int i;
1465 unsigned int ret = 0;
1466
1467 for (i = 0; i < instance->my_leave_memb_entries; i++){
1468 if (instance->my_leave_memb_list[i] == nodeid){
1469 ret = nodeid;
1470 break;
1471 }
1472 }
1473 return ret;
1474}
1475
1476static void my_leave_memb_set(
1477 struct totemsrp_instance *instance,
1478 unsigned int nodeid)
1479{
1480 int i, found = 0;
1481 for (i = 0; i < instance->my_leave_memb_entries; i++){
1482 if (instance->my_leave_memb_list[i] == nodeid){
1483 found = 1;
1484 break;
1485 }
1486 }
1487 if (found == 1) {
1488 return;
1489 }
1490 if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1491 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1492 instance->my_leave_memb_entries++;
1493 } else {
1495 "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1496 }
1497}
1498
1499
1500static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1501{
1502 assert (instance != NULL);
1503 return totemnet_buffer_alloc (instance->totemnet_context);
1504}
1505
1506static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1507{
1508 assert (instance != NULL);
1510}
1511
1512static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1513{
1514 int32_t res;
1515
1516 qb_loop_timer_del (instance->totemsrp_poll_handle,
1518 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1519 QB_LOOP_MED,
1520 instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1521 (void *)instance,
1522 timer_function_token_retransmit_timeout,
1523 &instance->timer_orf_token_retransmit_timeout);
1524 if (res != 0) {
1525 log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1526 }
1527
1528}
1529
1530static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1531{
1532 int32_t res;
1533
1534 if (instance->my_merge_detect_timeout_outstanding == 0) {
1535 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1536 QB_LOOP_MED,
1537 instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1538 (void *)instance,
1539 timer_function_merge_detect_timeout,
1540 &instance->timer_merge_detect_timeout);
1541 if (res != 0) {
1542 log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1543 }
1544
1546 }
1547}
1548
1549static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1550{
1551 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1553}
1554
1555/*
1556 * ring_state_* is used to save and restore the sort queue
1557 * state when a recovery operation fails (and enters gather)
1558 */
1559static void old_ring_state_save (struct totemsrp_instance *instance)
1560{
1561 if (instance->old_ring_state_saved == 0) {
1562 instance->old_ring_state_saved = 1;
1563 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1564 sizeof (struct memb_ring_id));
1565 instance->old_ring_state_aru = instance->my_aru;
1568 "Saving state aru %x high seq received %x",
1569 instance->my_aru, instance->my_high_seq_received);
1570 }
1571}
1572
1573static void old_ring_state_restore (struct totemsrp_instance *instance)
1574{
1575 instance->my_aru = instance->old_ring_state_aru;
1578 "Restoring instance->my_aru %x my high seq received %x",
1579 instance->my_aru, instance->my_high_seq_received);
1580}
1581
1582static void old_ring_state_reset (struct totemsrp_instance *instance)
1583{
1585 "Resetting old ring state");
1586 instance->old_ring_state_saved = 0;
1587}
1588
1589static void reset_pause_timeout (struct totemsrp_instance *instance)
1590{
1591 int32_t res;
1592
1593 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1594 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1595 QB_LOOP_MED,
1596 instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1597 (void *)instance,
1598 timer_function_pause_timeout,
1599 &instance->timer_pause_timeout);
1600 if (res != 0) {
1601 log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1602 }
1603}
1604
1605static void reset_token_warning (struct totemsrp_instance *instance) {
1606 int32_t res;
1607
1608 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1609 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1610 QB_LOOP_MED,
1611 instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1612 (void *)instance,
1613 timer_function_orf_token_warning,
1614 &instance->timer_orf_token_warning);
1615 if (res != 0) {
1616 log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1617 }
1618}
1619
1620static void reset_token_timeout (struct totemsrp_instance *instance) {
1621 int32_t res;
1622
1623 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1624 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1625 QB_LOOP_MED,
1626 instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1627 (void *)instance,
1628 timer_function_orf_token_timeout,
1629 &instance->timer_orf_token_timeout);
1630 if (res != 0) {
1631 log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1632 }
1633
1634 if (instance->totem_config->token_warning)
1635 reset_token_warning(instance);
1636}
1637
1638static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1639 int32_t res;
1640
1641 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1642 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1643 QB_LOOP_MED,
1644 instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1645 (void *)instance,
1646 timer_function_heartbeat_timeout,
1647 &instance->timer_heartbeat_timeout);
1648 if (res != 0) {
1649 log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1650 }
1651}
1652
1653
1654static void cancel_token_warning (struct totemsrp_instance *instance) {
1655 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1656}
1657
1658static void cancel_token_timeout (struct totemsrp_instance *instance) {
1659 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1660
1661 if (instance->totem_config->token_warning)
1662 cancel_token_warning(instance);
1663}
1664
1665static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1666 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1667}
1668
1669static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1670{
1671 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1672}
1673
1674static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1675{
1676 int32_t res;
1677
1678 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1679 QB_LOOP_MED,
1680 instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1681 (void *)instance,
1682 timer_function_token_hold_retransmit_timeout,
1683 &instance->timer_orf_token_hold_retransmit_timeout);
1684 if (res != 0) {
1685 log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1686 }
1687}
1688
1689static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1690{
1691 qb_loop_timer_del (instance->totemsrp_poll_handle,
1693}
1694
1695static void memb_state_consensus_timeout_expired (
1696 struct totemsrp_instance *instance)
1697{
1698 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1699 int no_consensus_list_entries;
1700
1701 instance->stats.consensus_timeouts++;
1702 if (memb_consensus_agreed (instance)) {
1703 memb_consensus_reset (instance);
1704
1705 memb_consensus_set (instance, &instance->my_id);
1706
1707 reset_token_timeout (instance); // REVIEWED
1708 } else {
1709 memb_consensus_notset (
1710 instance,
1711 no_consensus_list,
1712 &no_consensus_list_entries,
1713 instance->my_proc_list,
1714 instance->my_proc_list_entries);
1715
1716 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1717 instance->my_failed_list, &instance->my_failed_list_entries);
1718 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1719 }
1720}
1721
1722static void memb_join_message_send (struct totemsrp_instance *instance);
1723
1724static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1725
1726/*
1727 * Timers used for various states of the membership algorithm
1728 */
1729static void timer_function_pause_timeout (void *data)
1730{
1731 struct totemsrp_instance *instance = data;
1732
1733 instance->pause_timestamp = qb_util_nano_current_get ();
1734 reset_pause_timeout (instance);
1735}
1736
1737static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1738{
1739 old_ring_state_restore (instance);
1740 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1741 instance->stats.recovery_token_lost++;
1742}
1743
1744static void timer_function_orf_token_warning (void *data)
1745{
1746 struct totemsrp_instance *instance = data;
1747 uint64_t tv_diff;
1748
1749 /* need to protect against the case where token_warning is set to 0 dynamically */
1750 if (instance->totem_config->token_warning) {
1751 tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1752 instance->stats.token[instance->stats.latest_token].rx;
1754 "Token has not been received in %d ms ", (unsigned int) tv_diff);
1755 reset_token_warning(instance);
1756 } else {
1757 cancel_token_warning(instance);
1758 }
1759}
1760
1761static void timer_function_orf_token_timeout (void *data)
1762{
1763 struct totemsrp_instance *instance = data;
1764
1765 switch (instance->memb_state) {
1768 "The token was lost in the OPERATIONAL state.");
1770 "A processor failed, forming new configuration:"
1771 " token timed out (%ums), waiting %ums for consensus.",
1772 instance->totem_config->token_timeout,
1773 instance->totem_config->consensus_timeout);
1775 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1776 instance->stats.operational_token_lost++;
1777 break;
1778
1779 case MEMB_STATE_GATHER:
1781 "The consensus timeout expired (%ums).",
1782 instance->totem_config->consensus_timeout);
1783 memb_state_consensus_timeout_expired (instance);
1784 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1785 instance->stats.gather_token_lost++;
1786 break;
1787
1788 case MEMB_STATE_COMMIT:
1790 "The token was lost in the COMMIT state.");
1791 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1792 instance->stats.commit_token_lost++;
1793 break;
1794
1797 "The token was lost in the RECOVERY state.");
1798 memb_recovery_state_token_loss (instance);
1799 instance->orf_token_discard = 1;
1800 break;
1801 }
1802}
1803
1804static void timer_function_heartbeat_timeout (void *data)
1805{
1806 struct totemsrp_instance *instance = data;
1808 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1809 timer_function_orf_token_timeout(data);
1810}
1811
1812static void memb_timer_function_state_gather (void *data)
1813{
1814 struct totemsrp_instance *instance = data;
1815 int32_t res;
1816
1817 switch (instance->memb_state) {
1820 assert (0); /* this should never happen */
1821 break;
1822 case MEMB_STATE_GATHER:
1823 case MEMB_STATE_COMMIT:
1824 memb_join_message_send (instance);
1825
1826 /*
1827 * Restart the join timeout
1828 `*/
1829 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1830
1831 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1832 QB_LOOP_MED,
1833 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1834 (void *)instance,
1835 memb_timer_function_state_gather,
1836 &instance->memb_timer_state_gather_join_timeout);
1837
1838 if (res != 0) {
1839 log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1840 }
1841 break;
1842 }
1843}
1844
1845static void memb_timer_function_gather_consensus_timeout (void *data)
1846{
1847 struct totemsrp_instance *instance = data;
1848 memb_state_consensus_timeout_expired (instance);
1849}
1850
1851static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1852{
1853 unsigned int i;
1854 struct sort_queue_item *recovery_message_item;
1855 struct sort_queue_item regular_message_item;
1856 unsigned int range = 0;
1857 int res;
1858 void *ptr;
1859 struct mcast *mcast;
1860
1862 "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1863
1864 range = instance->my_aru - SEQNO_START_MSG;
1865 /*
1866 * Move messages from recovery to regular sort queue
1867 */
1868// todo should i be initialized to 0 or 1 ?
1869 for (i = 1; i <= range; i++) {
1870 res = sq_item_get (&instance->recovery_sort_queue,
1871 i + SEQNO_START_MSG, &ptr);
1872 if (res != 0) {
1873 continue;
1874 }
1875 recovery_message_item = ptr;
1876
1877 /*
1878 * Convert recovery message into regular message
1879 */
1880 mcast = recovery_message_item->mcast;
1882 /*
1883 * Message is a recovery message encapsulated
1884 * in a new ring message
1885 */
1886 regular_message_item.mcast =
1887 (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1888 regular_message_item.msg_len =
1889 recovery_message_item->msg_len - sizeof (struct mcast);
1890 mcast = regular_message_item.mcast;
1891 } else {
1892 /*
1893 * TODO this case shouldn't happen
1894 */
1895 continue;
1896 }
1897
1899 "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1900 (uint64_t)mcast->seq);
1901
1902 /*
1903 * Only add this message to the regular sort
1904 * queue if it was originated with the same ring
1905 * id as the previous ring
1906 */
1907 if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1908 sizeof (struct memb_ring_id)) == 0) {
1909
1910 res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1911 if (res == 0) {
1912 sq_item_add (&instance->regular_sort_queue,
1913 &regular_message_item, mcast->seq);
1914 if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1916 }
1917 }
1918 } else {
1920 "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1921 }
1922 }
1923}
1924
1925/*
1926 * Change states in the state machine of the membership algorithm
1927 */
1928static void memb_state_operational_enter (struct totemsrp_instance *instance)
1929{
1930 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1931 int joined_list_entries = 0;
1932 unsigned int aru_save;
1933 unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1934 unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1935 unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1936 unsigned int left_list[PROCESSOR_COUNT_MAX];
1937 unsigned int i;
1938 unsigned int res;
1939 char left_node_msg[1024];
1940 char joined_node_msg[1024];
1941 char failed_node_msg[1024];
1942
1943 instance->originated_orf_token = 0;
1944
1945 memb_consensus_reset (instance);
1946
1947 old_ring_state_reset (instance);
1948
1949 deliver_messages_from_recovery_to_regular (instance);
1950
1952 "Delivering to app %x to %x",
1953 instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1954
1955 aru_save = instance->my_aru;
1956 instance->my_aru = instance->old_ring_state_aru;
1957
1958 messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1959
1960 /*
1961 * Calculate joined and left list
1962 */
1963 memb_set_subtract (instance->my_left_memb_list,
1964 &instance->my_left_memb_entries,
1965 instance->my_memb_list, instance->my_memb_entries,
1966 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1967
1968 memb_set_subtract (joined_list, &joined_list_entries,
1969 instance->my_new_memb_list, instance->my_new_memb_entries,
1970 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1971
1972 /*
1973 * Install new membership
1974 */
1975 instance->my_memb_entries = instance->my_new_memb_entries;
1976 memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1977 sizeof (struct srp_addr) * instance->my_memb_entries);
1978 instance->last_released = 0;
1979 instance->my_set_retrans_flg = 0;
1980
1981 /*
1982 * Deliver transitional configuration to application
1983 */
1984 srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1985 instance->my_left_memb_entries);
1986 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1987 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1989 trans_memb_list_totemip, instance->my_trans_memb_entries,
1990 left_list, instance->my_left_memb_entries,
1991 0, 0, &instance->my_ring_id);
1992 /*
1993 * Switch new totemsrp messages queue. Messages sent from now on are stored
1994 * in different queue so synchronization messages are delivered first. Totempg
1995 * buffers will be switched later.
1996 */
1997 instance->waiting_trans_ack = 1;
1998
1999// TODO we need to filter to ensure we only deliver those
2000// messages which are part of instance->my_deliver_memb
2001 messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
2002
2003 /*
2004 * Switch totempg buffers. This used to be right after
2005 * instance->waiting_trans_ack = 1;
2006 * line. This was causing problem, because there may be not yet
2007 * processed parts of messages in totempg buffers.
2008 * So when buffers were switched and recovered messages
2009 * got delivered it was not possible to assemble them.
2010 */
2012
2013 instance->my_aru = aru_save;
2014
2015 /*
2016 * Deliver regular configuration to application
2017 */
2018 srp_addr_to_nodeid (instance, new_memb_list_totemip,
2019 instance->my_new_memb_list, instance->my_new_memb_entries);
2020 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
2021 joined_list_entries);
2023 new_memb_list_totemip, instance->my_new_memb_entries,
2024 0, 0,
2025 joined_list_totemip, joined_list_entries, &instance->my_ring_id);
2026
2027 /*
2028 * The recovery sort queue now becomes the regular
2029 * sort queue. It is necessary to copy the state
2030 * into the regular sort queue.
2031 */
2032 sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2033 instance->my_last_aru = SEQNO_START_MSG;
2034
2035 /* When making my_proc_list smaller, ensure that the
2036 * now non-used entries are zero-ed out. There are some suspect
2037 * assert's that assume that there is always 2 entries in the list.
2038 * These fail when my_proc_list is reduced to 1 entry (and the
2039 * valid [0] entry is the same as the 'unused' [1] entry).
2040 */
2041 memset(instance->my_proc_list, 0,
2042 sizeof (struct srp_addr) * instance->my_proc_list_entries);
2043
2044 instance->my_proc_list_entries = instance->my_new_memb_entries;
2045 memcpy (instance->my_proc_list, instance->my_new_memb_list,
2046 sizeof (struct srp_addr) * instance->my_memb_entries);
2047
2048 instance->my_failed_list_entries = 0;
2049 /*
2050 * TODO Not exactly to spec
2051 *
2052 * At the entry to this function all messages without a gap are
2053 * deliered.
2054 *
2055 * This code throw away messages from the last gap in the sort queue
2056 * to my_high_seq_received
2057 *
2058 * What should really happen is we should deliver all messages up to
2059 * a gap, then delier the transitional configuration, then deliver
2060 * the messages between the first gap and my_high_seq_received, then
2061 * deliver a regular configuration, then deliver the regular
2062 * configuration
2063 *
2064 * Unfortunately totempg doesn't appear to like this operating mode
2065 * which needs more inspection
2066 */
2067 i = instance->my_high_seq_received + 1;
2068 do {
2069 void *ptr;
2070
2071 i -= 1;
2072 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2073 if (i == 0) {
2074 break;
2075 }
2076 } while (res);
2077
2078 instance->my_high_delivered = i;
2079
2080 for (i = 0; i <= instance->my_high_delivered; i++) {
2081 void *ptr;
2082
2083 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2084 if (res == 0) {
2085 struct sort_queue_item *regular_message;
2086
2087 regular_message = ptr;
2088 free (regular_message->mcast);
2089 }
2090 }
2091 sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2092 instance->last_released = instance->my_high_delivered;
2093
2094 if (joined_list_entries) {
2095 int sptr = 0;
2096 sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2097 for (i=0; i< joined_list_entries; i++) {
2098 sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " " CS_PRI_NODE_ID, joined_list_totemip[i]);
2099 }
2100 }
2101 else {
2102 joined_node_msg[0] = '\0';
2103 }
2104
2105 if (instance->my_left_memb_entries) {
2106 int sptr = 0;
2107 int sptr2 = 0;
2108 sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2109 for (i=0; i< instance->my_left_memb_entries; i++) {
2110 sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " " CS_PRI_NODE_ID, left_list[i]);
2111 }
2112 for (i=0; i< instance->my_left_memb_entries; i++) {
2113 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2114 if (sptr2 == 0) {
2115 sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2116 }
2117 sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " " CS_PRI_NODE_ID, left_list[i]);
2118 }
2119 }
2120 if (sptr2 == 0) {
2121 failed_node_msg[0] = '\0';
2122 }
2123 }
2124 else {
2125 left_node_msg[0] = '\0';
2126 failed_node_msg[0] = '\0';
2127 }
2128
2129 my_leave_memb_clear(instance);
2130
2132 "entering OPERATIONAL state.");
2134 "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2135 instance->my_ring_id.rep,
2136 (uint64_t)instance->my_ring_id.seq,
2137 joined_node_msg,
2138 left_node_msg);
2139
2140 if (strlen(failed_node_msg)) {
2142 "Failed to receive the leave message.%s",
2143 failed_node_msg);
2144 }
2145
2147
2148 instance->stats.operational_entered++;
2149 instance->stats.continuous_gather = 0;
2150
2151 instance->my_received_flg = 1;
2152
2153 reset_pause_timeout (instance);
2154
2155 /*
2156 * Save ring id information from this configuration to determine
2157 * which processors are transitioning from old regular configuration
2158 * in to new regular configuration on the next configuration change
2159 */
2160 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2161 sizeof (struct memb_ring_id));
2162
2163 return;
2164}
2165
2166static void memb_state_gather_enter (
2167 struct totemsrp_instance *instance,
2168 enum gather_state_from gather_from)
2169{
2170 int32_t res;
2171
2172 instance->orf_token_discard = 1;
2173
2174 instance->originated_orf_token = 0;
2175
2176 memb_set_merge (
2177 &instance->my_id, 1,
2178 instance->my_proc_list, &instance->my_proc_list_entries);
2179
2180 memb_join_message_send (instance);
2181
2182 /*
2183 * Restart the join timeout
2184 */
2185 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2186
2187 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2188 QB_LOOP_MED,
2189 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2190 (void *)instance,
2191 memb_timer_function_state_gather,
2192 &instance->memb_timer_state_gather_join_timeout);
2193 if (res != 0) {
2194 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2195 }
2196
2197 /*
2198 * Restart the consensus timeout
2199 */
2200 qb_loop_timer_del (instance->totemsrp_poll_handle,
2202
2203 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2204 QB_LOOP_MED,
2205 instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2206 (void *)instance,
2207 memb_timer_function_gather_consensus_timeout,
2208 &instance->memb_timer_state_gather_consensus_timeout);
2209 if (res != 0) {
2210 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2211 }
2212
2213 /*
2214 * Cancel the token loss and token retransmission timeouts
2215 */
2216 cancel_token_retransmit_timeout (instance); // REVIEWED
2217 cancel_token_timeout (instance); // REVIEWED
2218 cancel_merge_detect_timeout (instance);
2219
2220 memb_consensus_reset (instance);
2221
2222 memb_consensus_set (instance, &instance->my_id);
2223
2225 "entering GATHER state from %d(%s).",
2226 gather_from, gsfrom_to_msg(gather_from));
2227
2228 instance->memb_state = MEMB_STATE_GATHER;
2229 instance->stats.gather_entered++;
2230
2232 /*
2233 * State 3 means gather, so we are continuously gathering.
2234 */
2235 instance->stats.continuous_gather++;
2236 }
2237
2238 return;
2239}
2240
2241static void timer_function_token_retransmit_timeout (void *data);
2242
2243static void target_set_completed (
2244 void *context)
2245{
2246 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2247
2248 memb_state_commit_token_send (instance);
2249
2250}
2251
2252static void memb_state_commit_enter (
2253 struct totemsrp_instance *instance)
2254{
2255 old_ring_state_save (instance);
2256
2257 memb_state_commit_token_update (instance);
2258
2259 memb_state_commit_token_target_set (instance);
2260
2261 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2262
2264
2265 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2266
2268
2269 memb_ring_id_set (instance, &instance->commit_token->ring_id);
2270
2271 instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2272
2273 instance->token_ring_id_seq = instance->my_ring_id.seq;
2274
2276 "entering COMMIT state.");
2277
2278 instance->memb_state = MEMB_STATE_COMMIT;
2279 reset_token_retransmit_timeout (instance); // REVIEWED
2280 reset_token_timeout (instance); // REVIEWED
2281
2282 instance->stats.commit_entered++;
2283 instance->stats.continuous_gather = 0;
2284
2285 /*
2286 * reset all flow control variables since we are starting a new ring
2287 */
2288 instance->my_trc = 0;
2289 instance->my_pbl = 0;
2290 instance->my_cbl = 0;
2291 /*
2292 * commit token sent after callback that token target has been set
2293 */
2294}
2295
2296static void memb_state_recovery_enter (
2297 struct totemsrp_instance *instance,
2299{
2300 int i;
2301 int local_received_flg = 1;
2302 unsigned int low_ring_aru;
2303 unsigned int range = 0;
2304 unsigned int messages_originated = 0;
2305 const struct srp_addr *addr;
2306 struct memb_commit_token_memb_entry *memb_list;
2307 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2308
2309 addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2310 memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2311
2313 "entering RECOVERY state.");
2314
2315 instance->orf_token_discard = 0;
2316
2317 instance->my_high_ring_delivered = 0;
2318
2319 sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2320 cs_queue_reinit (&instance->retrans_message_queue);
2321
2322 low_ring_aru = instance->old_ring_state_high_seq_received;
2323
2324 memb_state_commit_token_send_recovery (instance, commit_token);
2325
2326 instance->my_token_seq = SEQNO_START_TOKEN - 1;
2327
2328 /*
2329 * Build regular configuration
2330 */
2332 instance->totemnet_context,
2333 commit_token->addr_entries);
2334
2335 /*
2336 * Build transitional configuration
2337 */
2338 for (i = 0; i < instance->my_new_memb_entries; i++) {
2339 memcpy (&my_new_memb_ring_id_list[i],
2340 &memb_list[i].ring_id,
2341 sizeof (struct memb_ring_id));
2342 }
2343 memb_set_and_with_ring_id (
2344 instance->my_new_memb_list,
2345 my_new_memb_ring_id_list,
2346 instance->my_new_memb_entries,
2347 instance->my_memb_list,
2348 instance->my_memb_entries,
2349 &instance->my_old_ring_id,
2350 instance->my_trans_memb_list,
2351 &instance->my_trans_memb_entries);
2352
2353 for (i = 0; i < instance->my_trans_memb_entries; i++) {
2355 "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2356 }
2357 for (i = 0; i < instance->my_new_memb_entries; i++) {
2359 "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2361 "previous ringid (" CS_PRI_RING_ID ")",
2362 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2363
2365 "aru %x high delivered %x received flag %d",
2366 memb_list[i].aru,
2367 memb_list[i].high_delivered,
2368 memb_list[i].received_flg);
2369
2370 // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2371 }
2372 /*
2373 * Determine if any received flag is false
2374 */
2375 for (i = 0; i < commit_token->addr_entries; i++) {
2376 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2377 instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2378
2379 memb_list[i].received_flg == 0) {
2380 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2381 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2382 sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2383 local_received_flg = 0;
2384 break;
2385 }
2386 }
2387 if (local_received_flg == 1) {
2388 goto no_originate;
2389 } /* Else originate messages if we should */
2390
2391 /*
2392 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2393 */
2394 for (i = 0; i < commit_token->addr_entries; i++) {
2395 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2396 instance->my_deliver_memb_list,
2397 instance->my_deliver_memb_entries) &&
2398
2399 memcmp (&instance->my_old_ring_id,
2400 &memb_list[i].ring_id,
2401 sizeof (struct memb_ring_id)) == 0) {
2402
2403 if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2404
2405 low_ring_aru = memb_list[i].aru;
2406 }
2407 if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2408 instance->my_high_ring_delivered = memb_list[i].high_delivered;
2409 }
2410 }
2411 }
2412
2413 /*
2414 * Copy all old ring messages to instance->retrans_message_queue
2415 */
2416 range = instance->old_ring_state_high_seq_received - low_ring_aru;
2417 if (range == 0) {
2418 /*
2419 * No messages to copy
2420 */
2421 goto no_originate;
2422 }
2423 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2424
2426 "copying all old ring messages from %x-%x.",
2427 low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2428
2429 for (i = 1; i <= range; i++) {
2432 void *ptr;
2433 int res;
2434
2435 res = sq_item_get (&instance->regular_sort_queue,
2436 low_ring_aru + i, &ptr);
2437 if (res != 0) {
2438 continue;
2439 }
2440 sort_queue_item = ptr;
2441 messages_originated++;
2442 memset (&message_item, 0, sizeof (struct message_item));
2443 // TODO LEAK
2444 message_item.mcast = totemsrp_buffer_alloc (instance);
2445 assert (message_item.mcast);
2446 memset(message_item.mcast, 0, sizeof (struct mcast));
2450 message_item.mcast->system_from = instance->my_id;
2452
2454 assert (message_item.mcast->header.nodeid);
2455 memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2456 sizeof (struct memb_ring_id));
2457 message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2458 memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2461 cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2462 }
2464 "Originated %d messages in RECOVERY.", messages_originated);
2465 goto originated;
2466
2467no_originate:
2469 "Did not need to originate any messages in recovery.");
2470
2471originated:
2472 instance->my_aru = SEQNO_START_MSG;
2473 instance->my_aru_count = 0;
2474 instance->my_seq_unchanged = 0;
2476 instance->my_install_seq = SEQNO_START_MSG;
2477 instance->last_released = SEQNO_START_MSG;
2478
2479 reset_token_timeout (instance); // REVIEWED
2480 reset_token_retransmit_timeout (instance); // REVIEWED
2481
2482 instance->memb_state = MEMB_STATE_RECOVERY;
2483 instance->stats.recovery_entered++;
2484 instance->stats.continuous_gather = 0;
2485
2486 return;
2487}
2488
2489void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2490{
2491 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2492
2493 token_hold_cancel_send (instance);
2494
2495 return;
2496}
2497
2499 void *srp_context,
2500 struct iovec *iovec,
2501 unsigned int iov_len,
2502 int guarantee)
2503{
2504 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2505 int i;
2507 char *addr;
2508 unsigned int addr_idx;
2509 struct cs_queue *queue_use;
2510
2511 if (instance->waiting_trans_ack) {
2512 queue_use = &instance->new_message_queue_trans;
2513 } else {
2514 queue_use = &instance->new_message_queue;
2515 }
2516
2517 if (cs_queue_is_full (queue_use)) {
2518 log_printf (instance->totemsrp_log_level_debug, "queue full");
2519 return (-1);
2520 }
2521
2522 memset (&message_item, 0, sizeof (struct message_item));
2523
2524 /*
2525 * Allocate pending item
2526 */
2527 message_item.mcast = totemsrp_buffer_alloc (instance);
2528 if (message_item.mcast == 0) {
2529 goto error_mcast;
2530 }
2531
2532 /*
2533 * Set mcast header
2534 */
2535 memset(message_item.mcast, 0, sizeof (struct mcast));
2540
2542 assert (message_item.mcast->header.nodeid);
2543
2545 message_item.mcast->system_from = instance->my_id;
2546
2547 addr = (char *)message_item.mcast;
2548 addr_idx = sizeof (struct mcast);
2549 for (i = 0; i < iov_len; i++) {
2550 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2551 addr_idx += iovec[i].iov_len;
2552 }
2553
2554 message_item.msg_len = addr_idx;
2555
2556 log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2557 instance->stats.mcast_tx++;
2558 cs_queue_item_add (queue_use, &message_item);
2559
2560 return (0);
2561
2562error_mcast:
2563 return (-1);
2564}
2565
2566/*
2567 * Determine if there is room to queue a new message
2568 */
2569int totemsrp_avail (void *srp_context)
2570{
2571 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2572 int avail;
2573 struct cs_queue *queue_use;
2574
2575 if (instance->waiting_trans_ack) {
2576 queue_use = &instance->new_message_queue_trans;
2577 } else {
2578 queue_use = &instance->new_message_queue;
2579 }
2580 cs_queue_avail (queue_use, &avail);
2581
2582 return (avail);
2583}
2584
2585/*
2586 * ORF Token Management
2587 */
2588/*
2589 * Recast message to mcast group if it is available
2590 */
2591static int orf_token_remcast (
2592 struct totemsrp_instance *instance,
2593 int seq)
2594{
2596 int res;
2597 void *ptr;
2598
2599 struct sq *sort_queue;
2600
2601 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2602 sort_queue = &instance->recovery_sort_queue;
2603 } else {
2604 sort_queue = &instance->regular_sort_queue;
2605 }
2606
2607 res = sq_in_range (sort_queue, seq);
2608 if (res == 0) {
2609 log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2610 return (-1);
2611 }
2612
2613 /*
2614 * Get RTR item at seq, if not available, return
2615 */
2616 res = sq_item_get (sort_queue, seq, &ptr);
2617 if (res != 0) {
2618 return -1;
2619 }
2620
2621 sort_queue_item = ptr;
2622
2624 instance->totemnet_context,
2627
2628 return (0);
2629}
2630
2631
2632/*
2633 * Free all freeable messages from ring
2634 */
2635static void messages_free (
2636 struct totemsrp_instance *instance,
2637 unsigned int token_aru)
2638{
2639 struct sort_queue_item *regular_message;
2640 unsigned int i;
2641 int res;
2642 int log_release = 0;
2643 unsigned int release_to;
2644 unsigned int range = 0;
2645
2646 release_to = token_aru;
2647 if (sq_lt_compare (instance->my_last_aru, release_to)) {
2648 release_to = instance->my_last_aru;
2649 }
2650 if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2651 release_to = instance->my_high_delivered;
2652 }
2653
2654 /*
2655 * Ensure we dont try release before an already released point
2656 */
2657 if (sq_lt_compare (release_to, instance->last_released)) {
2658 return;
2659 }
2660
2661 range = release_to - instance->last_released;
2662 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2663
2664 /*
2665 * Release retransmit list items if group aru indicates they are transmitted
2666 */
2667 for (i = 1; i <= range; i++) {
2668 void *ptr;
2669
2670 res = sq_item_get (&instance->regular_sort_queue,
2671 instance->last_released + i, &ptr);
2672 if (res == 0) {
2673 regular_message = ptr;
2674 totemsrp_buffer_release (instance, regular_message->mcast);
2675 }
2676 sq_items_release (&instance->regular_sort_queue,
2677 instance->last_released + i);
2678
2679 log_release = 1;
2680 }
2681 instance->last_released += range;
2682
2683 if (log_release) {
2685 "releasing messages up to and including %x", release_to);
2686 }
2687}
2688
2689static void update_aru (
2690 struct totemsrp_instance *instance)
2691{
2692 unsigned int i;
2693 int res;
2694 struct sq *sort_queue;
2695 unsigned int range;
2696 unsigned int my_aru_saved = 0;
2697
2698 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2699 sort_queue = &instance->recovery_sort_queue;
2700 } else {
2701 sort_queue = &instance->regular_sort_queue;
2702 }
2703
2704 range = instance->my_high_seq_received - instance->my_aru;
2705
2706 my_aru_saved = instance->my_aru;
2707 for (i = 1; i <= range; i++) {
2708
2709 void *ptr;
2710
2711 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2712 /*
2713 * If hole, stop updating aru
2714 */
2715 if (res != 0) {
2716 break;
2717 }
2718 }
2719 instance->my_aru += i - 1;
2720}
2721
2722/*
2723 * Multicasts pending messages onto the ring (requires orf_token possession)
2724 */
2725static int orf_token_mcast (
2726 struct totemsrp_instance *instance,
2727 struct orf_token *token,
2728 int fcc_mcasts_allowed)
2729{
2730 struct message_item *message_item = 0;
2731 struct cs_queue *mcast_queue;
2732 struct sq *sort_queue;
2734 struct mcast *mcast;
2735 unsigned int fcc_mcast_current;
2736
2737 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2738 mcast_queue = &instance->retrans_message_queue;
2739 sort_queue = &instance->recovery_sort_queue;
2740 reset_token_retransmit_timeout (instance); // REVIEWED
2741 } else {
2742 if (instance->waiting_trans_ack) {
2743 mcast_queue = &instance->new_message_queue_trans;
2744 } else {
2745 mcast_queue = &instance->new_message_queue;
2746 }
2747
2748 sort_queue = &instance->regular_sort_queue;
2749 }
2750
2751 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2752 if (cs_queue_is_empty (mcast_queue)) {
2753 break;
2754 }
2755 message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2756
2757 message_item->mcast->seq = ++token->seq;
2758 message_item->mcast->this_seqno = instance->global_seqno++;
2759
2760 /*
2761 * Build IO vector
2762 */
2763 memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2766
2768
2769 memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2770
2771 /*
2772 * Add message to retransmit queue
2773 */
2774 sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2775
2777 instance->totemnet_context,
2780
2781 /*
2782 * Delete item from pending queue
2783 */
2784 cs_queue_item_remove (mcast_queue);
2785
2786 /*
2787 * If messages mcasted, deliver any new messages to totempg
2788 */
2789 instance->my_high_seq_received = token->seq;
2790 }
2791
2792 update_aru (instance);
2793
2794 /*
2795 * Return 1 if more messages are available for single node clusters
2796 */
2797 return (fcc_mcast_current);
2798}
2799
2800/*
2801 * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2802 * Modify's orf_token's rtr to include retransmits required by this process
2803 */
2804static int orf_token_rtr (
2805 struct totemsrp_instance *instance,
2806 struct orf_token *orf_token,
2807 unsigned int *fcc_allowed)
2808{
2809 unsigned int res;
2810 unsigned int i, j;
2811 unsigned int found;
2812 struct sq *sort_queue;
2813 struct rtr_item *rtr_list;
2814 unsigned int range = 0;
2815 char retransmit_msg[1024];
2816 char value[64];
2817
2818 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2819 sort_queue = &instance->recovery_sort_queue;
2820 } else {
2821 sort_queue = &instance->regular_sort_queue;
2822 }
2823
2825
2826 strcpy (retransmit_msg, "Retransmit List: ");
2829 "Retransmit List %d", orf_token->rtr_list_entries);
2830 for (i = 0; i < orf_token->rtr_list_entries; i++) {
2831 sprintf (value, "%x ", rtr_list[i].seq);
2832 strcat (retransmit_msg, value);
2833 }
2834 strcat (retransmit_msg, "");
2836 "%s", retransmit_msg);
2837 }
2838
2839 /*
2840 * Retransmit messages on orf_token's RTR list from RTR queue
2841 */
2842 for (instance->fcc_remcast_current = 0, i = 0;
2843 instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2844
2845 /*
2846 * If this retransmit request isn't from this configuration,
2847 * try next rtr entry
2848 */
2849 if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2850 sizeof (struct memb_ring_id)) != 0) {
2851
2852 i += 1;
2853 continue;
2854 }
2855
2856 res = orf_token_remcast (instance, rtr_list[i].seq);
2857 if (res == 0) {
2858 /*
2859 * Multicasted message, so no need to copy to new retransmit list
2860 */
2862 assert (orf_token->rtr_list_entries >= 0);
2863 memmove (&rtr_list[i], &rtr_list[i + 1],
2864 sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2865
2866 instance->stats.mcast_retx++;
2867 instance->fcc_remcast_current++;
2868 } else {
2869 i += 1;
2870 }
2871 }
2872 *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2873
2874 /*
2875 * Add messages to retransmit to RTR list
2876 * but only retry if there is room in the retransmit list
2877 */
2878
2879 range = orf_token->seq - instance->my_aru;
2880 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2881
2883 (i <= range); i++) {
2884
2885 /*
2886 * Ensure message is within the sort queue range
2887 */
2888 res = sq_in_range (sort_queue, instance->my_aru + i);
2889 if (res == 0) {
2890 break;
2891 }
2892
2893 /*
2894 * Find if a message is missing from this processor
2895 */
2896 res = sq_item_inuse (sort_queue, instance->my_aru + i);
2897 if (res == 0) {
2898 /*
2899 * Determine how many times we have missed receiving
2900 * this sequence number. sq_item_miss_count increments
2901 * a counter for the sequence number. The miss count
2902 * will be returned and compared. This allows time for
2903 * delayed multicast messages to be received before
2904 * declaring the message is missing and requesting a
2905 * retransmit.
2906 */
2907 res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2908 if (res < instance->totem_config->miss_count_const) {
2909 continue;
2910 }
2911
2912 /*
2913 * Determine if missing message is already in retransmit list
2914 */
2915 found = 0;
2916 for (j = 0; j < orf_token->rtr_list_entries; j++) {
2917 if (instance->my_aru + i == rtr_list[j].seq) {
2918 found = 1;
2919 }
2920 }
2921 if (found == 0) {
2922 /*
2923 * Missing message not found in current retransmit list so add it
2924 */
2926 &instance->my_ring_id, sizeof (struct memb_ring_id));
2927 rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2929 }
2930 }
2931 }
2932 return (instance->fcc_remcast_current);
2933}
2934
2935static void token_retransmit (struct totemsrp_instance *instance)
2936{
2938 instance->orf_token_retransmit,
2939 instance->orf_token_retransmit_size);
2940}
2941
2942/*
2943 * Retransmit the regular token if no mcast or token has
2944 * been received in retransmit token period retransmit
2945 * the token to the next processor
2946 */
2947static void timer_function_token_retransmit_timeout (void *data)
2948{
2949 struct totemsrp_instance *instance = data;
2950
2951 switch (instance->memb_state) {
2952 case MEMB_STATE_GATHER:
2953 break;
2954 case MEMB_STATE_COMMIT:
2957 token_retransmit (instance);
2958 reset_token_retransmit_timeout (instance); // REVIEWED
2959 break;
2960 }
2961}
2962
2963static void timer_function_token_hold_retransmit_timeout (void *data)
2964{
2965 struct totemsrp_instance *instance = data;
2966
2967 switch (instance->memb_state) {
2968 case MEMB_STATE_GATHER:
2969 break;
2970 case MEMB_STATE_COMMIT:
2971 break;
2974 token_retransmit (instance);
2975 break;
2976 }
2977}
2978
2979static void timer_function_merge_detect_timeout(void *data)
2980{
2981 struct totemsrp_instance *instance = data;
2982
2984
2985 switch (instance->memb_state) {
2987 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2988 memb_merge_detect_transmit (instance);
2989 }
2990 break;
2991 case MEMB_STATE_GATHER:
2992 case MEMB_STATE_COMMIT:
2994 break;
2995 }
2996}
2997
2998/*
2999 * Send orf_token to next member (requires orf_token)
3000 */
3001static int token_send (
3002 struct totemsrp_instance *instance,
3003 struct orf_token *orf_token,
3004 int forward_token)
3005{
3006 int res = 0;
3007 unsigned int orf_token_size;
3008
3009 orf_token_size = sizeof (struct orf_token) +
3010 (orf_token->rtr_list_entries * sizeof (struct rtr_item));
3011
3012 orf_token->header.nodeid = instance->my_id.nodeid;
3013 memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
3014 instance->orf_token_retransmit_size = orf_token_size;
3015 assert (orf_token->header.nodeid);
3016
3017 if (forward_token == 0) {
3018 return (0);
3019 }
3020
3022 orf_token,
3023 orf_token_size);
3024
3025 return (res);
3026}
3027
3028static int token_hold_cancel_send (struct totemsrp_instance *instance)
3029{
3031
3032 /*
3033 * Only cancel if the token is currently held
3034 */
3035 if (instance->my_token_held == 0) {
3036 return (0);
3037 }
3038 instance->my_token_held = 0;
3039
3040 /*
3041 * Build message
3042 */
3048 memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3049 sizeof (struct memb_ring_id));
3051
3052 instance->stats.token_hold_cancel_tx++;
3053
3055 sizeof (struct token_hold_cancel));
3056
3057 return (0);
3058}
3059
3060static int orf_token_send_initial (struct totemsrp_instance *instance)
3061{
3062 struct orf_token orf_token;
3063 int res;
3064
3069 orf_token.header.nodeid = instance->my_id.nodeid;
3070 assert (orf_token.header.nodeid);
3074 instance->my_set_retrans_flg = 1;
3075 instance->stats.orf_token_tx++;
3076
3077 if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3079 instance->my_set_retrans_flg = 0;
3080 } else {
3082 instance->my_set_retrans_flg = 1;
3083 }
3084
3085 orf_token.aru = 0;
3087 orf_token.aru_addr = instance->my_id.nodeid;
3088
3089 memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3090 orf_token.fcc = 0;
3091 orf_token.backlog = 0;
3092
3094
3095 res = token_send (instance, &orf_token, 1);
3096
3097 return (res);
3098}
3099
3100static void memb_state_commit_token_update (
3101 struct totemsrp_instance *instance)
3102{
3103 struct srp_addr *addr;
3104 struct memb_commit_token_memb_entry *memb_list;
3105 unsigned int high_aru;
3106 unsigned int i;
3107
3108 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3109 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3110
3111 memcpy (instance->my_new_memb_list, addr,
3112 sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3113
3114 instance->my_new_memb_entries = instance->commit_token->addr_entries;
3115
3116 memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3117 &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3118
3119 memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3120 /*
3121 * TODO high delivered is really instance->my_aru, but with safe this
3122 * could change?
3123 */
3124 instance->my_received_flg =
3125 (instance->my_aru == instance->my_high_seq_received);
3126
3127 memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3128
3129 memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3130 /*
3131 * find high aru up to current memb_index for all matching ring ids
3132 * if any ring id matching memb_index has aru less then high aru set
3133 * received flag for that entry to false
3134 */
3135 high_aru = memb_list[instance->commit_token->memb_index].aru;
3136 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3137 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3138 &memb_list[i].ring_id,
3139 sizeof (struct memb_ring_id)) == 0) {
3140
3141 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3142 high_aru = memb_list[i].aru;
3143 }
3144 }
3145 }
3146
3147 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3148 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3149 &memb_list[i].ring_id,
3150 sizeof (struct memb_ring_id)) == 0) {
3151
3152 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3153 memb_list[i].received_flg = 0;
3154 if (i == instance->commit_token->memb_index) {
3155 instance->my_received_flg = 0;
3156 }
3157 }
3158 }
3159 }
3160
3161 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3162 instance->commit_token->memb_index += 1;
3163 assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3164 assert (instance->commit_token->header.nodeid);
3165}
3166
3167static void memb_state_commit_token_target_set (
3168 struct totemsrp_instance *instance)
3169{
3170 struct srp_addr *addr;
3171
3172 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3173
3174 /* Totemnet just looks at the node id */
3176 instance->totemnet_context,
3177 addr[instance->commit_token->memb_index %
3178 instance->commit_token->addr_entries].nodeid);
3179}
3180
3181static int memb_state_commit_token_send_recovery (
3182 struct totemsrp_instance *instance,
3183 struct memb_commit_token *commit_token)
3184{
3185 unsigned int commit_token_size;
3186
3187 commit_token->token_seq++;
3188 commit_token->header.nodeid = instance->my_id.nodeid;
3189 commit_token_size = sizeof (struct memb_commit_token) +
3190 ((sizeof (struct srp_addr) +
3191 sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3192 /*
3193 * Make a copy for retransmission if necessary
3194 */
3195 memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3196 instance->orf_token_retransmit_size = commit_token_size;
3197
3198 instance->stats.memb_commit_token_tx++;
3199
3201 commit_token,
3202 commit_token_size);
3203
3204 /*
3205 * Request retransmission of the commit token in case it is lost
3206 */
3207 reset_token_retransmit_timeout (instance);
3208 return (0);
3209}
3210
3211static int memb_state_commit_token_send (
3212 struct totemsrp_instance *instance)
3213{
3214 unsigned int commit_token_size;
3215
3216 instance->commit_token->token_seq++;
3217 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3218 commit_token_size = sizeof (struct memb_commit_token) +
3219 ((sizeof (struct srp_addr) +
3220 sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3221 /*
3222 * Make a copy for retransmission if necessary
3223 */
3224 memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3225 instance->orf_token_retransmit_size = commit_token_size;
3226
3227 instance->stats.memb_commit_token_tx++;
3228
3230 instance->commit_token,
3231 commit_token_size);
3232
3233 /*
3234 * Request retransmission of the commit token in case it is lost
3235 */
3236 reset_token_retransmit_timeout (instance);
3237 return (0);
3238}
3239
3240
3241static int memb_lowest_in_config (struct totemsrp_instance *instance)
3242{
3243 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3244 int token_memb_entries = 0;
3245 int i;
3246 unsigned int lowest_nodeid;
3247
3248 memb_set_subtract (token_memb, &token_memb_entries,
3249 instance->my_proc_list, instance->my_proc_list_entries,
3250 instance->my_failed_list, instance->my_failed_list_entries);
3251
3252 /*
3253 * find representative by searching for smallest identifier
3254 */
3255 assert(token_memb_entries > 0);
3256
3257 lowest_nodeid = token_memb[0].nodeid;
3258 for (i = 1; i < token_memb_entries; i++) {
3259 if (lowest_nodeid > token_memb[i].nodeid) {
3260 lowest_nodeid = token_memb[i].nodeid;
3261 }
3262 }
3263 return (lowest_nodeid == instance->my_id.nodeid);
3264}
3265
3266static int srp_addr_compare (const void *a, const void *b)
3267{
3268 const struct srp_addr *srp_a = (const struct srp_addr *)a;
3269 const struct srp_addr *srp_b = (const struct srp_addr *)b;
3270
3271 if (srp_a->nodeid < srp_b->nodeid) {
3272 return -1;
3273 } else if (srp_a->nodeid > srp_b->nodeid) {
3274 return 1;
3275 } else {
3276 return 0;
3277 }
3278}
3279
3280static void memb_state_commit_token_create (
3281 struct totemsrp_instance *instance)
3282{
3283 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3284 struct srp_addr *addr;
3285 struct memb_commit_token_memb_entry *memb_list;
3286 int token_memb_entries = 0;
3287
3289 "Creating commit token because I am the rep.");
3290
3291 memb_set_subtract (token_memb, &token_memb_entries,
3292 instance->my_proc_list, instance->my_proc_list_entries,
3293 instance->my_failed_list, instance->my_failed_list_entries);
3294
3295 memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3299 instance->commit_token->header.encapsulated = 0;
3300 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3301 assert (instance->commit_token->header.nodeid);
3302
3303 instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3304 instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3305
3306 /*
3307 * This qsort is necessary to ensure the commit token traverses
3308 * the ring in the proper order
3309 */
3310 qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3311 srp_addr_compare);
3312
3313 instance->commit_token->memb_index = 0;
3314 instance->commit_token->addr_entries = token_memb_entries;
3315
3316 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3317 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3318
3319 memcpy (addr, token_memb,
3320 token_memb_entries * sizeof (struct srp_addr));
3321 memset (memb_list, 0,
3322 sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3323}
3324
3325static void memb_join_message_send (struct totemsrp_instance *instance)
3326{
3327 char memb_join_data[40000];
3328 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3329 char *addr;
3330 unsigned int addr_idx;
3331 size_t msg_len;
3332
3337 memb_join->header.nodeid = instance->my_id.nodeid;
3338 assert (memb_join->header.nodeid);
3339
3340 msg_len = sizeof(struct memb_join) +
3341 ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3342
3343 if (msg_len > sizeof(memb_join_data)) {
3345 "memb_join_message too long. Ignoring message.");
3346
3347 return ;
3348 }
3349
3350 memb_join->ring_seq = instance->my_ring_id.seq;
3353 memb_join->system_from = instance->my_id;
3354
3355 /*
3356 * This mess adds the joined and failed processor lists into the join
3357 * message
3358 */
3359 addr = (char *)memb_join;
3360 addr_idx = sizeof (struct memb_join);
3361 memcpy (&addr[addr_idx],
3362 instance->my_proc_list,
3363 instance->my_proc_list_entries *
3364 sizeof (struct srp_addr));
3365 addr_idx +=
3366 instance->my_proc_list_entries *
3367 sizeof (struct srp_addr);
3368 memcpy (&addr[addr_idx],
3369 instance->my_failed_list,
3370 instance->my_failed_list_entries *
3371 sizeof (struct srp_addr));
3372 addr_idx +=
3373 instance->my_failed_list_entries *
3374 sizeof (struct srp_addr);
3375
3376 if (instance->totem_config->send_join_timeout) {
3377 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3378 }
3379
3380 instance->stats.memb_join_tx++;
3381
3383 instance->totemnet_context,
3384 memb_join,
3385 addr_idx);
3386}
3387
3388static void memb_leave_message_send (struct totemsrp_instance *instance)
3389{
3390 char memb_join_data[40000];
3391 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3392 char *addr;
3393 unsigned int addr_idx;
3394 int active_memb_entries;
3395 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3396 size_t msg_len;
3397
3399 "sending join/leave message");
3400
3401 /*
3402 * add us to the failed list, and remove us from
3403 * the members list
3404 */
3405 memb_set_merge(
3406 &instance->my_id, 1,
3407 instance->my_failed_list, &instance->my_failed_list_entries);
3408
3409 memb_set_subtract (active_memb, &active_memb_entries,
3410 instance->my_proc_list, instance->my_proc_list_entries,
3411 &instance->my_id, 1);
3412
3413 msg_len = sizeof(struct memb_join) +
3414 ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3415
3416 if (msg_len > sizeof(memb_join_data)) {
3418 "memb_leave message too long. Ignoring message.");
3419
3420 return ;
3421 }
3422
3428
3429 memb_join->ring_seq = instance->my_ring_id.seq;
3430 memb_join->proc_list_entries = active_memb_entries;
3432 memb_join->system_from = instance->my_id;
3433
3434 // TODO: CC Maybe use the actual join send routine.
3435 /*
3436 * This mess adds the joined and failed processor lists into the join
3437 * message
3438 */
3439 addr = (char *)memb_join;
3440 addr_idx = sizeof (struct memb_join);
3441 memcpy (&addr[addr_idx],
3442 active_memb,
3443 active_memb_entries *
3444 sizeof (struct srp_addr));
3445 addr_idx +=
3446 active_memb_entries *
3447 sizeof (struct srp_addr);
3448 memcpy (&addr[addr_idx],
3449 instance->my_failed_list,
3450 instance->my_failed_list_entries *
3451 sizeof (struct srp_addr));
3452 addr_idx +=
3453 instance->my_failed_list_entries *
3454 sizeof (struct srp_addr);
3455
3456
3457 if (instance->totem_config->send_join_timeout) {
3458 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3459 }
3460 instance->stats.memb_join_tx++;
3461
3463 instance->totemnet_context,
3464 memb_join,
3465 addr_idx);
3466}
3467
3468static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3469{
3471
3478 memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3479 sizeof (struct memb_ring_id));
3481
3482 instance->stats.memb_merge_detect_tx++;
3485 sizeof (struct memb_merge_detect));
3486}
3487
3488static void memb_ring_id_set (
3489 struct totemsrp_instance *instance,
3490 const struct memb_ring_id *ring_id)
3491{
3492
3493 memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3494}
3495
3497 void *srp_context,
3498 void **handle_out,
3500 int delete,
3501 int (*callback_fn) (enum totem_callback_token_type type, const void *),
3502 const void *data)
3503{
3504 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3505 struct token_callback_instance *callback_handle;
3506
3507 token_hold_cancel_send (instance);
3508
3509 callback_handle = malloc (sizeof (struct token_callback_instance));
3510 if (callback_handle == 0) {
3511 return (-1);
3512 }
3513 *handle_out = (void *)callback_handle;
3514 qb_list_init (&callback_handle->list);
3515 callback_handle->callback_fn = callback_fn;
3516 callback_handle->data = (void *) data;
3517 callback_handle->callback_type = type;
3518 callback_handle->delete = delete;
3519 switch (type) {
3521 qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3522 break;
3524 qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3525 break;
3526 }
3527
3528 return (0);
3529}
3530
3531void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3532{
3533 struct token_callback_instance *h;
3534
3535 if (*handle_out) {
3536 h = (struct token_callback_instance *)*handle_out;
3537 qb_list_del (&h->list);
3538 free (h);
3539 h = NULL;
3540 *handle_out = 0;
3541 }
3542}
3543
3544static void token_callbacks_execute (
3545 struct totemsrp_instance *instance,
3547{
3548 struct qb_list_head *list, *tmp_iter;
3549 struct qb_list_head *callback_listhead = 0;
3551 int res;
3552 int del;
3553
3554 switch (type) {
3556 callback_listhead = &instance->token_callback_received_listhead;
3557 break;
3559 callback_listhead = &instance->token_callback_sent_listhead;
3560 break;
3561 default:
3562 assert (0);
3563 }
3564
3565 qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3566 token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3568 if (del == 1) {
3569 qb_list_del (list);
3570 }
3571
3575 /*
3576 * This callback failed to execute, try it again on the next token
3577 */
3578 if (res == -1 && del == 1) {
3579 qb_list_add (list, callback_listhead);
3580 } else if (del) {
3582 }
3583 }
3584}
3585
3586/*
3587 * Flow control functions
3588 */
3589static unsigned int backlog_get (struct totemsrp_instance *instance)
3590{
3591 unsigned int backlog = 0;
3592 struct cs_queue *queue_use = NULL;
3593
3594 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3595 if (instance->waiting_trans_ack) {
3596 queue_use = &instance->new_message_queue_trans;
3597 } else {
3598 queue_use = &instance->new_message_queue;
3599 }
3600 } else
3601 if (instance->memb_state == MEMB_STATE_RECOVERY) {
3602 queue_use = &instance->retrans_message_queue;
3603 }
3604
3605 if (queue_use != NULL) {
3606 backlog = cs_queue_used (queue_use);
3607 }
3608
3609 instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3610 return (backlog);
3611}
3612
3613static int fcc_calculate (
3614 struct totemsrp_instance *instance,
3615 struct orf_token *token)
3616{
3617 unsigned int transmits_allowed;
3618 unsigned int backlog_calc;
3619
3620 transmits_allowed = instance->totem_config->max_messages;
3621
3622 if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3623 transmits_allowed = instance->totem_config->window_size - token->fcc;
3624 }
3625
3626 instance->my_cbl = backlog_get (instance);
3627
3628 /*
3629 * Only do backlog calculation if there is a backlog otherwise
3630 * we would result in div by zero
3631 */
3632 if (token->backlog + instance->my_cbl - instance->my_pbl) {
3633 backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3634 (token->backlog + instance->my_cbl - instance->my_pbl);
3635 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3636 transmits_allowed = backlog_calc;
3637 }
3638 }
3639
3640 return (transmits_allowed);
3641}
3642
3643/*
3644 * don't overflow the RTR sort queue
3645 */
3646static void fcc_rtr_limit (
3647 struct totemsrp_instance *instance,
3648 struct orf_token *token,
3649 unsigned int *transmits_allowed)
3650{
3651 int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3652 check -= (*transmits_allowed + instance->totem_config->window_size);
3653 assert (check >= 0);
3654 if (sq_lt_compare (instance->last_released +
3655 QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3656 instance->totem_config->window_size,
3657
3658 token->seq)) {
3659
3660 *transmits_allowed = 0;
3661 }
3662}
3663
3664static void fcc_token_update (
3665 struct totemsrp_instance *instance,
3666 struct orf_token *token,
3667 unsigned int msgs_transmitted)
3668{
3669 token->fcc += msgs_transmitted - instance->my_trc;
3670 token->backlog += instance->my_cbl - instance->my_pbl;
3671 instance->my_trc = msgs_transmitted;
3672 instance->my_pbl = instance->my_cbl;
3673}
3674
3675/*
3676 * Sanity checkers
3677 */
3678static int check_orf_token_sanity(
3679 const struct totemsrp_instance *instance,
3680 const void *msg,
3681 size_t msg_len,
3682 int endian_conversion_needed)
3683{
3684 int rtr_entries;
3685 const struct orf_token *token = (const struct orf_token *)msg;
3686 size_t required_len;
3687
3688 if (msg_len < sizeof(struct orf_token)) {
3690 "Received orf_token message is too short... ignoring.");
3691
3692 return (-1);
3693 }
3694
3695 if (endian_conversion_needed) {
3696 rtr_entries = swab32(token->rtr_list_entries);
3697 } else {
3698 rtr_entries = token->rtr_list_entries;
3699 }
3700
3701 required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3702 if (msg_len < required_len) {
3704 "Received orf_token message is too short... ignoring.");
3705
3706 return (-1);
3707 }
3708
3709 return (0);
3710}
3711
3712static int check_mcast_sanity(
3713 struct totemsrp_instance *instance,
3714 const void *msg,
3715 size_t msg_len,
3716 int endian_conversion_needed)
3717{
3718
3719 if (msg_len < sizeof(struct mcast)) {
3721 "Received mcast message is too short... ignoring.");
3722
3723 return (-1);
3724 }
3725
3726 return (0);
3727}
3728
3729static int check_memb_merge_detect_sanity(
3730 struct totemsrp_instance *instance,
3731 const void *msg,
3732 size_t msg_len,
3733 int endian_conversion_needed)
3734{
3735
3736 if (msg_len < sizeof(struct memb_merge_detect)) {
3738 "Received memb_merge_detect message is too short... ignoring.");
3739
3740 return (-1);
3741 }
3742
3743 return (0);
3744}
3745
3746static int check_memb_join_sanity(
3747 struct totemsrp_instance *instance,
3748 const void *msg,
3749 size_t msg_len,
3750 int endian_conversion_needed)
3751{
3752 const struct memb_join *mj_msg = (const struct memb_join *)msg;
3753 unsigned int proc_list_entries;
3754 unsigned int failed_list_entries;
3755 size_t required_len;
3756
3757 if (msg_len < sizeof(struct memb_join)) {
3759 "Received memb_join message is too short... ignoring.");
3760
3761 return (-1);
3762 }
3763
3766
3767 if (endian_conversion_needed) {
3770 }
3771
3772 required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3773 if (msg_len < required_len) {
3775 "Received memb_join message is too short... ignoring.");
3776
3777 return (-1);
3778 }
3779
3780 return (0);
3781}
3782
3783static int check_memb_commit_token_sanity(
3784 struct totemsrp_instance *instance,
3785 const void *msg,
3786 size_t msg_len,
3787 int endian_conversion_needed)
3788{
3789 const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3790 unsigned int addr_entries;
3791 size_t required_len;
3792
3793 if (msg_len < sizeof(struct memb_commit_token)) {
3795 "Received memb_commit_token message is too short... ignoring.");
3796
3797 return (0);
3798 }
3799
3800 addr_entries= mct_msg->addr_entries;
3801 if (endian_conversion_needed) {
3803 }
3804
3805 required_len = sizeof(struct memb_commit_token) +
3806 (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3807 if (msg_len < required_len) {
3809 "Received memb_commit_token message is too short... ignoring.");
3810
3811 return (-1);
3812 }
3813
3814 return (0);
3815}
3816
3817static int check_token_hold_cancel_sanity(
3818 struct totemsrp_instance *instance,
3819 const void *msg,
3820 size_t msg_len,
3821 int endian_conversion_needed)
3822{
3823
3824 if (msg_len < sizeof(struct token_hold_cancel)) {
3826 "Received token_hold_cancel message is too short... ignoring.");
3827
3828 return (-1);
3829 }
3830
3831 return (0);
3832}
3833
3834/*
3835 * Message Handlers
3836 */
3837
3838unsigned long long int tv_old;
3839/*
3840 * message handler called when TOKEN message type received
3841 */
3842static int message_handler_orf_token (
3843 struct totemsrp_instance *instance,
3844 const void *msg,
3845 size_t msg_len,
3846 int endian_conversion_needed)
3847{
3848 char token_storage[1500];
3849 char token_convert[1500];
3850 struct orf_token *token = NULL;
3851 int forward_token;
3852 unsigned int transmits_allowed;
3853 unsigned int mcasted_retransmit;
3854 unsigned int mcasted_regular;
3855 unsigned int last_aru;
3856
3857#ifdef GIVEINFO
3858 unsigned long long tv_current;
3859 unsigned long long tv_diff;
3860
3861 tv_current = qb_util_nano_current_get ();
3862 tv_diff = tv_current - tv_old;
3863 tv_old = tv_current;
3864
3866 "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3867#endif
3868
3869 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3870 return (0);
3871 }
3872
3873 if (instance->orf_token_discard) {
3874 return (0);
3875 }
3876#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3877 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3878 return (0);
3879 }
3880#endif
3881
3882 if (endian_conversion_needed) {
3883 orf_token_endian_convert ((struct orf_token *)msg,
3884 (struct orf_token *)token_convert);
3885 msg = (struct orf_token *)token_convert;
3886 }
3887
3888 /*
3889 * Make copy of token and retransmit list in case we have
3890 * to flush incoming messages from the kernel queue
3891 */
3892 token = (struct orf_token *)token_storage;
3893 memcpy (token, msg, sizeof (struct orf_token));
3894 memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3895 sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3896
3897
3898 /*
3899 * Handle merge detection timeout
3900 */
3901 if (token->seq == instance->my_last_seq) {
3902 start_merge_detect_timeout (instance);
3903 instance->my_seq_unchanged += 1;
3904 } else {
3905 cancel_merge_detect_timeout (instance);
3906 cancel_token_hold_retransmit_timeout (instance);
3907 instance->my_seq_unchanged = 0;
3908 }
3909
3910 instance->my_last_seq = token->seq;
3911
3912#ifdef TEST_RECOVERY_MSG_COUNT
3913 if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3914 return (0);
3915 }
3916#endif
3917 instance->flushing = 1;
3919 instance->flushing = 0;
3920
3921 /*
3922 * Determine if we should hold (in reality drop) the token
3923 */
3924 instance->my_token_held = 0;
3925 if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3926 instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3927 instance->my_token_held = 1;
3928 } else {
3929 if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3930 instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3931 instance->my_token_held = 1;
3932 }
3933 }
3934
3935 /*
3936 * Hold onto token when there is no activity on ring and
3937 * this processor is the ring rep
3938 */
3939 forward_token = 1;
3940 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3941 if (instance->my_token_held) {
3942 forward_token = 0;
3943 }
3944 }
3945
3946 switch (instance->memb_state) {
3947 case MEMB_STATE_COMMIT:
3948 /* Discard token */
3949 break;
3950
3952 messages_free (instance, token->aru);
3953 /*
3954 * Do NOT add break, this case should also execute code in gather case.
3955 */
3956
3957 case MEMB_STATE_GATHER:
3958 /*
3959 * DO NOT add break, we use different free mechanism in recovery state
3960 */
3961
3963 /*
3964 * Discard tokens from another configuration
3965 */
3966 if (memcmp (&token->ring_id, &instance->my_ring_id,
3967 sizeof (struct memb_ring_id)) != 0) {
3968
3969 if ((forward_token)
3970 && instance->use_heartbeat) {
3971 reset_heartbeat_timeout(instance);
3972 }
3973 else {
3974 cancel_heartbeat_timeout(instance);
3975 }
3976
3977 return (0); /* discard token */
3978 }
3979
3980 /*
3981 * Discard retransmitted tokens
3982 */
3983 if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3984 return (0); /* discard token */
3985 }
3986
3987 /*
3988 * Token is valid so trigger callbacks
3989 */
3990 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3991
3992 last_aru = instance->my_last_aru;
3993 instance->my_last_aru = token->aru;
3994
3995 transmits_allowed = fcc_calculate (instance, token);
3996 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3997
3999 instance->my_token_held == 1 &&
4000 (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4001 instance->my_token_held = 0;
4002 forward_token = 1;
4003 }
4004
4005 fcc_rtr_limit (instance, token, &transmits_allowed);
4006 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4007/*
4008if (mcasted_regular) {
4009printf ("mcasted regular %d\n", mcasted_regular);
4010printf ("token seq %d\n", token->seq);
4011}
4012*/
4013 fcc_token_update (instance, token, mcasted_retransmit +
4014 mcasted_regular);
4015
4016 if (sq_lt_compare (instance->my_aru, token->aru) ||
4017 instance->my_id.nodeid == token->aru_addr ||
4018 token->aru_addr == 0) {
4019
4020 token->aru = instance->my_aru;
4021 if (token->aru == token->seq) {
4022 token->aru_addr = 0;
4023 } else {
4024 token->aru_addr = instance->my_id.nodeid;
4025 }
4026 }
4027 if (token->aru == last_aru && token->aru_addr != 0) {
4028 instance->my_aru_count += 1;
4029 } else {
4030 instance->my_aru_count = 0;
4031 }
4032
4033 /*
4034 * We really don't follow specification there. In specification, OTHER nodes
4035 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4036 * to failed list (so node never mark itself as failed)
4037 */
4038 if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4039 token->aru_addr == instance->my_id.nodeid) {
4040
4042 "FAILED TO RECEIVE");
4043
4044 instance->failed_to_recv = 1;
4045
4046 memb_set_merge (&instance->my_id, 1,
4047 instance->my_failed_list,
4048 &instance->my_failed_list_entries);
4049
4050 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4051 } else {
4052 instance->my_token_seq = token->token_seq;
4053 token->token_seq += 1;
4054
4055 if (instance->memb_state == MEMB_STATE_RECOVERY) {
4056 /*
4057 * instance->my_aru == instance->my_high_seq_received means this processor
4058 * has recovered all messages it can recover
4059 * (ie: its retrans queue is empty)
4060 */
4061 if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4062
4063 if (token->retrans_flg == 0) {
4064 token->retrans_flg = 1;
4065 instance->my_set_retrans_flg = 1;
4066 }
4067 } else
4068 if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4069 token->retrans_flg = 0;
4070 instance->my_set_retrans_flg = 0;
4071 }
4073 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4074 token->retrans_flg, instance->my_set_retrans_flg,
4075 cs_queue_is_empty (&instance->retrans_message_queue),
4076 instance->my_retrans_flg_count, token->aru);
4077 if (token->retrans_flg == 0) {
4078 instance->my_retrans_flg_count += 1;
4079 } else {
4080 instance->my_retrans_flg_count = 0;
4081 }
4082 if (instance->my_retrans_flg_count == 2) {
4083 instance->my_install_seq = token->seq;
4084 }
4086 "install seq %x aru %x high seq received %x",
4087 instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4088 if (instance->my_retrans_flg_count >= 2 &&
4089 instance->my_received_flg == 0 &&
4090 sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4091 instance->my_received_flg = 1;
4092 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4093 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4094 sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4095 }
4096 if (instance->my_retrans_flg_count >= 3 &&
4097 sq_lte_compare (instance->my_install_seq, token->aru)) {
4098 instance->my_rotation_counter += 1;
4099 } else {
4100 instance->my_rotation_counter = 0;
4101 }
4102 if (instance->my_rotation_counter == 2) {
4104 "retrans flag count %x token aru %x install seq %x aru %x %x",
4105 instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4106 instance->my_aru, token->seq);
4107
4108 memb_state_operational_enter (instance);
4109 instance->my_rotation_counter = 0;
4110 instance->my_retrans_flg_count = 0;
4111 }
4112 }
4113
4115 token_send (instance, token, forward_token);
4116
4117#ifdef GIVEINFO
4118 tv_current = qb_util_nano_current_get ();
4119 tv_diff = tv_current - tv_old;
4120 tv_old = tv_current;
4122 "I held %0.4f ms",
4123 ((float)tv_diff) / 1000000.0);
4124#endif
4125 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4126 messages_deliver_to_app (instance, 0,
4127 instance->my_high_seq_received);
4128 }
4129
4130 /*
4131 * Deliver messages after token has been transmitted
4132 * to improve performance
4133 */
4134 reset_token_timeout (instance); // REVIEWED
4135 reset_token_retransmit_timeout (instance); // REVIEWED
4136 if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4137 instance->my_token_held == 1) {
4138
4139 start_token_hold_retransmit_timeout (instance);
4140 }
4141
4142 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4143 }
4144 break;
4145 }
4146
4147 if ((forward_token)
4148 && instance->use_heartbeat) {
4149 reset_heartbeat_timeout(instance);
4150 }
4151 else {
4152 cancel_heartbeat_timeout(instance);
4153 }
4154
4155 return (0);
4156}
4157
4158static void messages_deliver_to_app (
4159 struct totemsrp_instance *instance,
4160 int skip,
4161 unsigned int end_point)
4162{
4163 struct sort_queue_item *sort_queue_item_p;
4164 unsigned int i;
4165 int res;
4166 struct mcast *mcast_in;
4167 struct mcast mcast_header;
4168 unsigned int range = 0;
4169 int endian_conversion_required;
4170 unsigned int my_high_delivered_stored = 0;
4171 struct srp_addr aligned_system_from;
4172
4173 range = end_point - instance->my_high_delivered;
4174
4175 if (range) {
4177 "Delivering %x to %x", instance->my_high_delivered,
4178 end_point);
4179 }
4180 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4181 my_high_delivered_stored = instance->my_high_delivered;
4182
4183 /*
4184 * Deliver messages in order from rtr queue to pending delivery queue
4185 */
4186 for (i = 1; i <= range; i++) {
4187
4188 void *ptr = 0;
4189
4190 /*
4191 * If out of range of sort queue, stop assembly
4192 */
4193 res = sq_in_range (&instance->regular_sort_queue,
4194 my_high_delivered_stored + i);
4195 if (res == 0) {
4196 break;
4197 }
4198
4199 res = sq_item_get (&instance->regular_sort_queue,
4200 my_high_delivered_stored + i, &ptr);
4201 /*
4202 * If hole, stop assembly
4203 */
4204 if (res != 0 && skip == 0) {
4205 break;
4206 }
4207
4208 instance->my_high_delivered = my_high_delivered_stored + i;
4209
4210 if (res != 0) {
4211 continue;
4212
4213 }
4214
4215 sort_queue_item_p = ptr;
4216
4217 mcast_in = sort_queue_item_p->mcast;
4218 assert (mcast_in != (struct mcast *)0xdeadbeef);
4219
4220 endian_conversion_required = 0;
4221 if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4222 endian_conversion_required = 1;
4223 mcast_endian_convert (mcast_in, &mcast_header);
4224 } else {
4225 memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4226 }
4227
4228 aligned_system_from = mcast_header.system_from;
4229
4230 /*
4231 * Skip messages not originated in instance->my_deliver_memb
4232 */
4233 if (skip &&
4234 memb_set_subset (&aligned_system_from,
4235 1,
4236 instance->my_deliver_memb_list,
4237 instance->my_deliver_memb_entries) == 0) {
4238
4239 instance->my_high_delivered = my_high_delivered_stored + i;
4240
4241 continue;
4242 }
4243
4244 /*
4245 * Message found
4246 */
4248 "Delivering MCAST message with seq %x to pending delivery queue",
4249 mcast_header.seq);
4250
4251 /*
4252 * Message is locally originated multicast
4253 */
4254 instance->totemsrp_deliver_fn (
4255 mcast_header.header.nodeid,
4256 ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4257 sort_queue_item_p->msg_len - sizeof (struct mcast),
4258 endian_conversion_required);
4259 }
4260}
4261
4262/*
4263 * recv message handler called when MCAST message type received
4264 */
4265static int message_handler_mcast (
4266 struct totemsrp_instance *instance,
4267 const void *msg,
4268 size_t msg_len,
4269 int endian_conversion_needed)
4270{
4272 struct sq *sort_queue;
4273 struct mcast mcast_header;
4274 struct srp_addr aligned_system_from;
4275
4276 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4277 return (0);
4278 }
4279
4280 if (endian_conversion_needed) {
4281 mcast_endian_convert (msg, &mcast_header);
4282 } else {
4283 memcpy (&mcast_header, msg, sizeof (struct mcast));
4284 }
4285
4286 if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4287 sort_queue = &instance->recovery_sort_queue;
4288 } else {
4289 sort_queue = &instance->regular_sort_queue;
4290 }
4291
4292 assert (msg_len <= FRAME_SIZE_MAX);
4293
4294#ifdef TEST_DROP_MCAST_PERCENTAGE
4295 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4296 return (0);
4297 }
4298#endif
4299
4300 /*
4301 * If the message is foreign execute the switch below
4302 */
4303 if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4304 sizeof (struct memb_ring_id)) != 0) {
4305
4306 aligned_system_from = mcast_header.system_from;
4307
4308 switch (instance->memb_state) {
4310 memb_set_merge (
4311 &aligned_system_from, 1,
4312 instance->my_proc_list, &instance->my_proc_list_entries);
4313 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4314 break;
4315
4316 case MEMB_STATE_GATHER:
4317 if (!memb_set_subset (
4318 &aligned_system_from,
4319 1,
4320 instance->my_proc_list,
4321 instance->my_proc_list_entries)) {
4322
4323 memb_set_merge (&aligned_system_from, 1,
4324 instance->my_proc_list, &instance->my_proc_list_entries);
4325 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4326 return (0);
4327 }
4328 break;
4329
4330 case MEMB_STATE_COMMIT:
4331 /* discard message */
4332 instance->stats.rx_msg_dropped++;
4333 break;
4334
4336 /* discard message */
4337 instance->stats.rx_msg_dropped++;
4338 break;
4339 }
4340 return (0);
4341 }
4342
4344 "Received ringid (" CS_PRI_RING_ID ") seq %x",
4345 mcast_header.ring_id.rep,
4346 (uint64_t)mcast_header.ring_id.seq,
4347 mcast_header.seq);
4348
4349 /*
4350 * Add mcast message to rtr queue if not already in rtr queue
4351 * otherwise free io vectors
4352 */
4353 if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4354 sq_in_range (sort_queue, mcast_header.seq) &&
4355 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4356
4357 /*
4358 * Allocate new multicast memory block
4359 */
4360// TODO LEAK
4361 sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4362 if (sort_queue_item.mcast == NULL) {
4363 return (-1); /* error here is corrected by the algorithm */
4364 }
4365 memcpy (sort_queue_item.mcast, msg, msg_len);
4366 sort_queue_item.msg_len = msg_len;
4367
4368 if (sq_lt_compare (instance->my_high_seq_received,
4369 mcast_header.seq)) {
4370 instance->my_high_seq_received = mcast_header.seq;
4371 }
4372
4373 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4374 }
4375
4376 update_aru (instance);
4377 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4378 messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4379 }
4380
4381/* TODO remove from retrans message queue for old ring in recovery state */
4382 return (0);
4383}
4384
4385static int message_handler_memb_merge_detect (
4386 struct totemsrp_instance *instance,
4387 const void *msg,
4388 size_t msg_len,
4389 int endian_conversion_needed)
4390{
4392 struct srp_addr aligned_system_from;
4393
4394 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4395 return (0);
4396 }
4397
4398 if (endian_conversion_needed) {
4399 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4400 } else {
4401 memcpy (&memb_merge_detect, msg,
4402 sizeof (struct memb_merge_detect));
4403 }
4404
4405 /*
4406 * do nothing if this is a merge detect from this configuration
4407 */
4408 if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4409 sizeof (struct memb_ring_id)) == 0) {
4410
4411 return (0);
4412 }
4413
4414 aligned_system_from = memb_merge_detect.system_from;
4415
4416 /*
4417 * Execute merge operation
4418 */
4419 switch (instance->memb_state) {
4421 memb_set_merge (&aligned_system_from, 1,
4422 instance->my_proc_list, &instance->my_proc_list_entries);
4423 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4424 break;
4425
4426 case MEMB_STATE_GATHER:
4427 if (!memb_set_subset (
4428 &aligned_system_from,
4429 1,
4430 instance->my_proc_list,
4431 instance->my_proc_list_entries)) {
4432
4433 memb_set_merge (&aligned_system_from, 1,
4434 instance->my_proc_list, &instance->my_proc_list_entries);
4435 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4436 return (0);
4437 }
4438 break;
4439
4440 case MEMB_STATE_COMMIT:
4441 /* do nothing in commit */
4442 break;
4443
4445 /* do nothing in recovery */
4446 break;
4447 }
4448 return (0);
4449}
4450
4451static void memb_join_process (
4452 struct totemsrp_instance *instance,
4453 const struct memb_join *memb_join)
4454{
4455 struct srp_addr *proc_list;
4456 struct srp_addr *failed_list;
4457 int gather_entered = 0;
4458 int fail_minus_memb_entries = 0;
4459 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4460 struct srp_addr aligned_system_from;
4461
4462 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4463 failed_list = proc_list + memb_join->proc_list_entries;
4464 aligned_system_from = memb_join->system_from;
4465
4466 log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4467 memb_set_log(instance, instance->totemsrp_log_level_trace,
4468 "proclist", proc_list, memb_join->proc_list_entries);
4469 memb_set_log(instance, instance->totemsrp_log_level_trace,
4470 "faillist", failed_list, memb_join->failed_list_entries);
4471 memb_set_log(instance, instance->totemsrp_log_level_trace,
4472 "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4473 memb_set_log(instance, instance->totemsrp_log_level_trace,
4474 "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4475
4477 if (instance->flushing) {
4480 "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4482 if (memb_join->failed_list_entries > 0) {
4483 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4484 }
4485 } else {
4487 "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4488 }
4489 return;
4490 } else {
4493 "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4494 if (memb_join->failed_list_entries > 0) {
4495 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4496 }
4497 }
4498 }
4499
4500 }
4501
4502 if (memb_set_equal (proc_list,
4504 instance->my_proc_list,
4505 instance->my_proc_list_entries) &&
4506
4507 memb_set_equal (failed_list,
4509 instance->my_failed_list,
4510 instance->my_failed_list_entries)) {
4511
4513 memb_consensus_set (instance, &aligned_system_from);
4514 }
4515
4516 if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4517 instance->failed_to_recv = 0;
4518 instance->my_proc_list[0] = instance->my_id;
4519 instance->my_proc_list_entries = 1;
4520 instance->my_failed_list_entries = 0;
4521
4522 memb_state_commit_token_create (instance);
4523
4524 memb_state_commit_enter (instance);
4525 return;
4526 }
4527 if (memb_consensus_agreed (instance) &&
4528 memb_lowest_in_config (instance)) {
4529
4530 memb_state_commit_token_create (instance);
4531
4532 memb_state_commit_enter (instance);
4533 } else {
4534 goto out;
4535 }
4536 } else
4537 if (memb_set_subset (proc_list,
4539 instance->my_proc_list,
4540 instance->my_proc_list_entries) &&
4541
4542 memb_set_subset (failed_list,
4544 instance->my_failed_list,
4545 instance->my_failed_list_entries)) {
4546
4547 goto out;
4548 } else
4549 if (memb_set_subset (&aligned_system_from, 1,
4550 instance->my_failed_list, instance->my_failed_list_entries)) {
4551
4552 goto out;
4553 } else {
4554 memb_set_merge (proc_list,
4556 instance->my_proc_list, &instance->my_proc_list_entries);
4557
4558 if (memb_set_subset (
4559 &instance->my_id, 1,
4560 failed_list, memb_join->failed_list_entries)) {
4561
4562 memb_set_merge (
4563 &aligned_system_from, 1,
4564 instance->my_failed_list, &instance->my_failed_list_entries);
4565 } else {
4566 if (memb_set_subset (
4567 &aligned_system_from, 1,
4568 instance->my_memb_list,
4569 instance->my_memb_entries)) {
4570
4571 if (memb_set_subset (
4572 &aligned_system_from, 1,
4573 instance->my_failed_list,
4574 instance->my_failed_list_entries) == 0) {
4575
4576 memb_set_merge (failed_list,
4578 instance->my_failed_list, &instance->my_failed_list_entries);
4579 } else {
4580 memb_set_subtract (fail_minus_memb,
4581 &fail_minus_memb_entries,
4582 failed_list,
4584 instance->my_memb_list,
4585 instance->my_memb_entries);
4586
4587 memb_set_merge (fail_minus_memb,
4588 fail_minus_memb_entries,
4589 instance->my_failed_list,
4590 &instance->my_failed_list_entries);
4591 }
4592 }
4593 }
4594 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4595 gather_entered = 1;
4596 }
4597
4598out:
4599 if (gather_entered == 0 &&
4600 instance->memb_state == MEMB_STATE_OPERATIONAL) {
4601
4602 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4603 }
4604}
4605
4606static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4607{
4608 int i;
4609 struct srp_addr *in_proc_list;
4610 struct srp_addr *in_failed_list;
4611 struct srp_addr *out_proc_list;
4612 struct srp_addr *out_failed_list;
4613
4616 out->header.type = in->header.type;
4617 out->header.nodeid = swab32 (in->header.nodeid);
4618 out->system_from = srp_addr_endian_convert(in->system_from);
4621 out->ring_seq = swab64 (in->ring_seq);
4622
4623 in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4624 in_failed_list = in_proc_list + out->proc_list_entries;
4625 out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4626 out_failed_list = out_proc_list + out->proc_list_entries;
4627
4628 for (i = 0; i < out->proc_list_entries; i++) {
4629 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4630 }
4631 for (i = 0; i < out->failed_list_entries; i++) {
4632 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4633 }
4634}
4635
4636static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4637{
4638 int i;
4639 struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4640 struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4641 struct memb_commit_token_memb_entry *in_memb_list;
4642 struct memb_commit_token_memb_entry *out_memb_list;
4643
4646 out->header.type = in->header.type;
4647 out->header.nodeid = swab32 (in->header.nodeid);
4648 out->token_seq = swab32 (in->token_seq);
4649 out->ring_id.rep = swab32(in->ring_id.rep);
4650 out->ring_id.seq = swab64 (in->ring_id.seq);
4651 out->retrans_flg = swab32 (in->retrans_flg);
4652 out->memb_index = swab32 (in->memb_index);
4653 out->addr_entries = swab32 (in->addr_entries);
4654
4655 in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4656 out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4657 for (i = 0; i < out->addr_entries; i++) {
4658 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4659
4660 /*
4661 * Only convert the memb entry if it has been set
4662 */
4663 if (in_memb_list[i].ring_id.rep != 0) {
4664 out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4665
4666 out_memb_list[i].ring_id.seq =
4667 swab64 (in_memb_list[i].ring_id.seq);
4668 out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4669 out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4670 out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4671 }
4672 }
4673}
4674
4675static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4676{
4677 int i;
4678
4681 out->header.type = in->header.type;
4682 out->header.nodeid = swab32 (in->header.nodeid);
4683 out->seq = swab32 (in->seq);
4684 out->token_seq = swab32 (in->token_seq);
4685 out->aru = swab32 (in->aru);
4686 out->ring_id.rep = swab32(in->ring_id.rep);
4687 out->aru_addr = swab32(in->aru_addr);
4688 out->ring_id.seq = swab64 (in->ring_id.seq);
4689 out->fcc = swab32 (in->fcc);
4690 out->backlog = swab32 (in->backlog);
4691 out->retrans_flg = swab32 (in->retrans_flg);
4693 for (i = 0; i < out->rtr_list_entries; i++) {
4694 out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4695 out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4696 out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4697 }
4698}
4699
4700static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4701{
4704 out->header.type = in->header.type;
4705 out->header.nodeid = swab32 (in->header.nodeid);
4707
4708 out->seq = swab32 (in->seq);
4709 out->this_seqno = swab32 (in->this_seqno);
4710 out->ring_id.rep = swab32(in->ring_id.rep);
4711 out->ring_id.seq = swab64 (in->ring_id.seq);
4712 out->node_id = swab32 (in->node_id);
4713 out->guarantee = swab32 (in->guarantee);
4714 out->system_from = srp_addr_endian_convert(in->system_from);
4715}
4716
4717static void memb_merge_detect_endian_convert (
4718 const struct memb_merge_detect *in,
4719 struct memb_merge_detect *out)
4720{
4723 out->header.type = in->header.type;
4724 out->header.nodeid = swab32 (in->header.nodeid);
4725 out->ring_id.rep = swab32(in->ring_id.rep);
4726 out->ring_id.seq = swab64 (in->ring_id.seq);
4727 out->system_from = srp_addr_endian_convert (in->system_from);
4728}
4729
4730static int ignore_join_under_operational (
4731 struct totemsrp_instance *instance,
4732 const struct memb_join *memb_join)
4733{
4734 struct srp_addr *proc_list;
4735 struct srp_addr *failed_list;
4736 unsigned long long ring_seq;
4737 struct srp_addr aligned_system_from;
4738
4739 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4740 failed_list = proc_list + memb_join->proc_list_entries;
4742 aligned_system_from = memb_join->system_from;
4743
4744 if (memb_set_subset (&instance->my_id, 1,
4745 failed_list, memb_join->failed_list_entries)) {
4746 return (1);
4747 }
4748
4749 /*
4750 * In operational state, my_proc_list is exactly the same as
4751 * my_memb_list.
4752 */
4753 if ((memb_set_subset (&aligned_system_from, 1,
4754 instance->my_memb_list, instance->my_memb_entries)) &&
4755 (ring_seq < instance->my_ring_id.seq)) {
4756 return (1);
4757 }
4758
4759 return (0);
4760}
4761
4762static int message_handler_memb_join (
4763 struct totemsrp_instance *instance,
4764 const void *msg,
4765 size_t msg_len,
4766 int endian_conversion_needed)
4767{
4768 const struct memb_join *memb_join;
4769 struct memb_join *memb_join_convert = alloca (msg_len);
4770 struct srp_addr aligned_system_from;
4771
4772 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4773 return (0);
4774 }
4775
4776 if (endian_conversion_needed) {
4777 memb_join = memb_join_convert;
4778 memb_join_endian_convert (msg, memb_join_convert);
4779
4780 } else {
4781 memb_join = msg;
4782 }
4783
4784 aligned_system_from = memb_join->system_from;
4785
4786 /*
4787 * If the process paused because it wasn't scheduled in a timely
4788 * fashion, flush the join messages because they may be queued
4789 * entries
4790 */
4791 if (pause_flush (instance)) {
4792 return (0);
4793 }
4794
4795 if (instance->token_ring_id_seq < memb_join->ring_seq) {
4797 }
4798 switch (instance->memb_state) {
4800 if (!ignore_join_under_operational (instance, memb_join)) {
4801 memb_join_process (instance, memb_join);
4802 }
4803 break;
4804
4805 case MEMB_STATE_GATHER:
4806 memb_join_process (instance, memb_join);
4807 break;
4808
4809 case MEMB_STATE_COMMIT:
4810 if (memb_set_subset (&aligned_system_from,
4811 1,
4812 instance->my_new_memb_list,
4813 instance->my_new_memb_entries) &&
4814
4815 memb_join->ring_seq >= instance->my_ring_id.seq) {
4816
4817 memb_join_process (instance, memb_join);
4818 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4819 }
4820 break;
4821
4823 if (memb_set_subset (&aligned_system_from,
4824 1,
4825 instance->my_new_memb_list,
4826 instance->my_new_memb_entries) &&
4827
4828 memb_join->ring_seq >= instance->my_ring_id.seq) {
4829
4830 memb_join_process (instance, memb_join);
4831 memb_recovery_state_token_loss (instance);
4832 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4833 }
4834 break;
4835 }
4836 return (0);
4837}
4838
4839static int message_handler_memb_commit_token (
4840 struct totemsrp_instance *instance,
4841 const void *msg,
4842 size_t msg_len,
4843 int endian_conversion_needed)
4844{
4845 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4847 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4848 int sub_entries;
4849
4850 struct srp_addr *addr;
4851
4853 "got commit token");
4854
4855 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4856 return (0);
4857 }
4858
4859 if (endian_conversion_needed) {
4860 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4861 } else {
4862 memcpy (memb_commit_token_convert, msg, msg_len);
4863 }
4864 memb_commit_token = memb_commit_token_convert;
4866
4867#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4868 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4869 return (0);
4870 }
4871#endif
4872 switch (instance->memb_state) {
4874 /* discard token */
4875 break;
4876
4877 case MEMB_STATE_GATHER:
4878 memb_set_subtract (sub, &sub_entries,
4879 instance->my_proc_list, instance->my_proc_list_entries,
4880 instance->my_failed_list, instance->my_failed_list_entries);
4881
4882 if (memb_set_equal (addr,
4884 sub,
4885 sub_entries) &&
4886
4888 memcpy (instance->commit_token, memb_commit_token, msg_len);
4889 memb_state_commit_enter (instance);
4890 }
4891 break;
4892
4893 case MEMB_STATE_COMMIT:
4894 /*
4895 * If retransmitted commit tokens are sent on this ring
4896 * filter them out and only enter recovery once the
4897 * commit token has traversed the array. This is
4898 * determined by :
4899 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4900 */
4901 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4903 memb_state_recovery_enter (instance, memb_commit_token);
4904 }
4905 break;
4906
4908 if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4909
4910 /* Filter out duplicated tokens */
4911 if (instance->originated_orf_token) {
4912 break;
4913 }
4914
4915 instance->originated_orf_token = 1;
4916
4918 "Sending initial ORF token");
4919
4920 // TODO convert instead of initiate
4921 orf_token_send_initial (instance);
4922 reset_token_timeout (instance); // REVIEWED
4923 reset_token_retransmit_timeout (instance); // REVIEWED
4924 }
4925 break;
4926 }
4927 return (0);
4928}
4929
4930static int message_handler_token_hold_cancel (
4931 struct totemsrp_instance *instance,
4932 const void *msg,
4933 size_t msg_len,
4934 int endian_conversion_needed)
4935{
4936 const struct token_hold_cancel *token_hold_cancel = msg;
4937
4938 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4939 return (0);
4940 }
4941
4942 if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4943 sizeof (struct memb_ring_id)) == 0) {
4944
4945 instance->my_seq_unchanged = 0;
4946 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4947 timer_function_token_retransmit_timeout (instance);
4948 }
4949 }
4950 return (0);
4951}
4952
4953static int check_message_header_validity(
4954 void *context,
4955 const void *msg,
4956 unsigned int msg_len,
4957 const struct sockaddr_storage *system_from)
4958{
4959 struct totemsrp_instance *instance = context;
4960 const struct totem_message_header *message_header = msg;
4961 const char *guessed_str;
4962 const char *msg_byte = msg;
4963
4964 if (msg_len < sizeof (struct totem_message_header)) {
4966 "Message received from %s is too short... Ignoring %u.",
4967 totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4968 return (-1);
4969 }
4970
4971 if (message_header->magic != TOTEM_MH_MAGIC &&
4972 message_header->magic != swab16(TOTEM_MH_MAGIC)) {
4973 /*
4974 * We've received ether Knet, old version of Corosync,
4975 * or something else. Do some guessing to display (hopefully)
4976 * helpful message
4977 */
4978 guessed_str = NULL;
4979
4980 if (message_header->magic == 0xFFFF) {
4981 /*
4982 * Corosync 2.2 used header with two UINT8_MAX
4983 */
4984 guessed_str = "Corosync 2.2";
4985 } else if (message_header->magic == 0xFEFE) {
4986 /*
4987 * Corosync 2.3+ used header with two UINT8_MAX - 1
4988 */
4989 guessed_str = "Corosync 2.3+";
4990 } else if (msg_byte[0] == 0x01) {
4991 /*
4992 * Knet has stable1 with first byte of message == 1
4993 */
4994 guessed_str = "unencrypted Kronosnet";
4995 } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4996 /*
4997 * Unencrypted Corosync 1.x/OpenAIS has first byte
4998 * 0-5. Collision with Knet (but still worth the try)
4999 */
5000 guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
5001 } else {
5002 /*
5003 * Encrypted Kronosned packet has a hash at the end of
5004 * the packet and nothing specific at the beginning of the
5005 * packet (just encrypted data).
5006 * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
5007 * is in the beginning of the packet.
5008 *
5009 * So it's not possible to reliably detect ether of them.
5010 */
5011 guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
5012 }
5013
5015 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5016 totemip_sa_print((struct sockaddr *)system_from),
5017 guessed_str);
5018
5019 return (-1);
5020 }
5021
5022 if (message_header->version != TOTEM_MH_VERSION) {
5024 "Message received from %s has unsupported version %u... Ignoring",
5025 totemip_sa_print((struct sockaddr *)system_from),
5026 message_header->version);
5027
5028 return (-1);
5029 }
5030
5031 return (0);
5032}
5033
5034
5036 void *context,
5037 const void *msg,
5038 unsigned int msg_len,
5039 const struct sockaddr_storage *system_from)
5040{
5041 struct totemsrp_instance *instance = context;
5042 const struct totem_message_header *message_header = msg;
5043
5044 if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5045 return ;
5046 }
5047
5048 switch (message_header->type) {
5050 instance->stats.orf_token_rx++;
5051 break;
5052 case MESSAGE_TYPE_MCAST:
5053 instance->stats.mcast_rx++;
5054 break;
5056 instance->stats.memb_merge_detect_rx++;
5057 break;
5059 instance->stats.memb_join_rx++;
5060 break;
5062 instance->stats.memb_commit_token_rx++;
5063 break;
5065 instance->stats.token_hold_cancel_rx++;
5066 break;
5067 default:
5069 "Message received from %s has wrong type... ignoring %d.\n",
5070 totemip_sa_print((struct sockaddr *)system_from),
5071 (int)message_header->type);
5072
5073 instance->stats.rx_msg_dropped++;
5074 return;
5075 }
5076 /*
5077 * Handle incoming message
5078 */
5079 totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5080 instance,
5081 msg,
5082 msg_len,
5083 message_header->magic != TOTEM_MH_MAGIC);
5084}
5085
5087 void *context,
5088 const struct totem_ip_address *interface_addr,
5089 unsigned short ip_port,
5090 unsigned int iface_no)
5091{
5092 struct totemsrp_instance *instance = context;
5093 int res;
5094
5095 totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5096
5097 res = totemnet_iface_set (
5098 instance->totemnet_context,
5099 interface_addr,
5100 ip_port,
5101 iface_no);
5102
5103 return (res);
5104}
5105
5106/* Contrary to its name, this only gets called when the interface is enabled */
5108 void *context,
5109 const struct totem_ip_address *iface_addr,
5110 unsigned int iface_no)
5111{
5112 struct totemsrp_instance *instance = context;
5113 int num_interfaces;
5114 int i;
5115
5116 if (!instance->my_id.nodeid) {
5117 instance->my_id.nodeid = iface_addr->nodeid;
5118 }
5119 totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5120
5121 if (instance->iface_changes++ == 0) {
5122 instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5123 /*
5124 * Increase the ring_id sequence number. This doesn't follow specification.
5125 * Solves problem with restarted leader node (node with lowest nodeid) before
5126 * rest of the cluster forms new membership and guarantees unique ring_id for
5127 * new singleton configuration.
5128 */
5129 instance->my_ring_id.seq++;
5130
5131 instance->token_ring_id_seq = instance->my_ring_id.seq;
5132 log_printf (
5133 instance->totemsrp_log_level_debug,
5134 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5135 instance->my_ring_id.rep,
5136 (uint64_t)instance->my_ring_id.seq);
5137
5138 if (instance->totemsrp_service_ready_fn) {
5139 instance->totemsrp_service_ready_fn ();
5140 }
5141
5142 }
5143
5144 num_interfaces = 0;
5145 for (i = 0; i < INTERFACE_MAX; i++) {
5146 if (instance->totem_config->interfaces[i].configured) {
5147 num_interfaces++;
5148 }
5149 }
5150
5151 if (instance->iface_changes >= num_interfaces) {
5152 /* We need to clear orig_interfaces so that 'commit' diffs against nothing */
5153 instance->totem_config->orig_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
5154 assert(instance->totem_config->orig_interfaces != NULL);
5155 memset(instance->totem_config->orig_interfaces, 0, sizeof (struct totem_interface) * INTERFACE_MAX);
5156
5158
5159 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5160 free(instance->totem_config->orig_interfaces);
5161 }
5162}
5163
5165 totem_config->net_mtu -= 2 * sizeof (struct mcast);
5166}
5167
5169 void *context,
5170 void (*totem_service_ready) (void))
5171{
5172 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5173
5174 instance->totemsrp_service_ready_fn = totem_service_ready;
5175}
5176
5178 void *context,
5179 const struct totem_ip_address *member,
5180 int iface_no)
5181{
5182 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5183 int res;
5184
5185 res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5186
5187 return (res);
5188}
5189
5191 void *context,
5192 const struct totem_ip_address *member,
5193 int iface_no)
5194{
5195 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5196 int res;
5197
5198 res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5199
5200 return (res);
5201}
5202
5204{
5205 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5206
5207 instance->threaded_mode_enabled = 1;
5208}
5209
5210void totemsrp_trans_ack (void *context)
5211{
5212 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5213
5214 instance->waiting_trans_ack = 0;
5216}
5217
5218
5220{
5221 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5222 int res;
5223
5225 return (res);
5226}
5227
5229{
5230 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5231 int res;
5232
5234 return (res);
5235}
5236
5237void totemsrp_stats_clear (void *context, int flags)
5238{
5239 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5240
5241 memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5244 }
5245}
5246
5247void totemsrp_force_gather (void *context)
5248{
5249 timer_function_orf_token_timeout(context);
5250}
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
@ TOTEM_CONFIGURATION_REGULAR
Definition: coroapi.h:133
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition: coroapi.h:134
#define INTERFACE_MAX
Definition: coroapi.h:88
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:98
unsigned int nodeid
Definition: coroapi.h:0
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:2
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_SENT
Definition: coroapi.h:144
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition: coroapi.h:143
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
#define CS_PRI_RING_ID_SEQ
Definition: corotypes.h:61
#define CS_PRI_NODE_ID
Definition: corotypes.h:59
#define CS_PRI_RING_ID
Definition: corotypes.h:62
uint32_t flags
uint32_t value
icmap_map_t icmap_get_global_map(void)
Return global icmap.
Definition: icmap.c:264
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
struct srp_addr addr
Definition: totemsrp.c:164
int guarantee
Definition: totemsrp.c:190
unsigned int node_id
Definition: totemsrp.c:189
struct memb_ring_id ring_id
Definition: totemsrp.c:188
struct totem_message_header header
Definition: totemsrp.c:184
unsigned int seq
Definition: totemsrp.c:186
int this_seqno
Definition: totemsrp.c:187
struct srp_addr system_from
Definition: totemsrp.c:185
Definition: totemsrp.c:243
unsigned int aru
Definition: totemsrp.c:245
unsigned int received_flg
Definition: totemsrp.c:247
struct memb_ring_id ring_id
Definition: totemsrp.c:244
unsigned int high_delivered
Definition: totemsrp.c:246
unsigned int retrans_flg
Definition: totemsrp.c:255
struct totem_message_header header
Definition: totemsrp.c:252
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:258
unsigned int token_seq
Definition: totemsrp.c:253
struct memb_ring_id ring_id
Definition: totemsrp.c:254
struct srp_addr system_from
Definition: totemsrp.c:217
struct totem_message_header header
Definition: totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:221
unsigned long long ring_seq
Definition: totemsrp.c:220
unsigned int failed_list_entries
Definition: totemsrp.c:219
unsigned int proc_list_entries
Definition: totemsrp.c:218
struct totem_message_header header
Definition: totemsrp.c:231
struct memb_ring_id ring_id
Definition: totemsrp.c:233
struct srp_addr system_from
Definition: totemsrp.c:232
The memb_ring_id struct.
Definition: coroapi.h:122
unsigned long long seq
Definition: coroapi.h:124
unsigned int rep
Definition: totem.h:150
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:535
unsigned int msg_len
Definition: totemsrp.c:269
struct mcast * mcast
Definition: totemsrp.c:268
unsigned int backlog
Definition: totemsrp.c:207
unsigned int token_seq
Definition: totemsrp.c:203
unsigned int aru_addr
Definition: totemsrp.c:205
unsigned int fcc
Definition: totemsrp.c:208
unsigned int aru
Definition: totemsrp.c:204
int rtr_list_entries
Definition: totemsrp.c:210
struct rtr_item rtr_list[0]
Definition: totemsrp.c:211
int retrans_flg
Definition: totemsrp.c:209
unsigned int seq
Definition: totemsrp.c:202
struct totem_message_header header
Definition: totemsrp.c:201
struct memb_ring_id ring_id
Definition: totemsrp.c:206
struct memb_ring_id ring_id
Definition: totemsrp.c:195
unsigned int seq
Definition: totemsrp.c:196
unsigned int msg_len
Definition: totemsrp.c:274
struct mcast * mcast
Definition: totemsrp.c:273
The sq struct.
Definition: sq.h:43
unsigned int nodeid
Definition: totemsrp.c:108
struct qb_list_head list
Definition: totemsrp.c:170
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:171
enum totem_callback_token_type callback_type
Definition: totemsrp.c:172
struct totem_message_header header
Definition: totemsrp.c:238
struct memb_ring_id ring_id
Definition: totemsrp.c:239
unsigned int max_messages
Definition: totem.h:220
unsigned int heartbeat_failures_allowed
Definition: totem.h:214
unsigned int token_timeout
Definition: totem.h:182
unsigned int window_size
Definition: totem.h:218
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:208
unsigned int downcheck_timeout
Definition: totem.h:200
unsigned int miss_count_const
Definition: totem.h:242
struct totem_interface * interfaces
Definition: totem.h:165
unsigned int cancel_token_hold_on_retransmit
Definition: totem.h:248
unsigned int fail_to_recv_const
Definition: totem.h:202
unsigned int merge_timeout
Definition: totem.h:198
struct totem_interface * orig_interfaces
Definition: totem.h:166
unsigned int net_mtu
Definition: totem.h:210
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:250
unsigned int token_retransmits_before_loss_const
Definition: totem.h:190
unsigned int max_network_delay
Definition: totem.h:216
unsigned int seqno_unchanged_const
Definition: totem.h:204
unsigned int consensus_timeout
Definition: totem.h:196
unsigned int threads
Definition: totem.h:212
unsigned int send_join_timeout
Definition: totem.h:194
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:254
unsigned int token_retransmit_timeout
Definition: totem.h:186
unsigned int token_warning
Definition: totem.h:184
unsigned int join_timeout
Definition: totem.h:192
unsigned int token_hold_timeout
Definition: totem.h:188
struct totem_ip_address boundto
Definition: totem.h:84
uint8_t configured
Definition: totem.h:89
int member_count
Definition: totem.h:90
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:97
struct totem_ip_address mcast_addr
Definition: totem.h:85
The totem_ip_address struct.
Definition: coroapi.h:111
unsigned int nodeid
Definition: coroapi.h:112
unsigned short family
Definition: coroapi.h:113
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:101
void(*) in log_level_security)
Definition: totem.h:110
unsigned int nodeid
Definition: totem.h:131
unsigned short magic
Definition: totem.h:127
uint8_t reachable
Definition: totem.h:268
uint32_t version
Definition: totem.h:266
struct totem_ip_address mcast_address
Definition: totemsrp.c:452
totemsrp_stats_t stats
Definition: totemsrp.c:516
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:320
int consensus_list_entries
Definition: totemsrp.c:300
int my_rotation_counter
Definition: totemsrp.c:358
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:346
unsigned int my_last_seq
Definition: totemsrp.c:496
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:419
int my_retrans_flg_count
Definition: totemsrp.c:362
unsigned int my_token_seq
Definition: totemsrp.c:396
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:413
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:298
int totemsrp_subsys_id
Definition: totemsrp.c:436
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:415
uint64_t pause_timestamp
Definition: totemsrp.c:512
uint32_t threaded_mode_enabled
Definition: totemsrp.c:522
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:316
void * totemnet_context
Definition: totemsrp.c:500
int my_leave_memb_entries
Definition: totemsrp.c:338
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:308
int my_set_retrans_flg
Definition: totemsrp.c:360
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:312
int my_failed_list_entries
Definition: totemsrp.c:326
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:310
unsigned int use_heartbeat
Definition: totemsrp.c:504
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:472
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:446
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:417
struct cs_queue new_message_queue
Definition: totemsrp.c:371
int orf_token_retransmit_size
Definition: totemsrp.c:394
int my_proc_list_entries
Definition: totemsrp.c:324
unsigned int my_high_seq_received
Definition: totemsrp.c:354
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:454
int old_ring_state_aru
Definition: totemsrp.c:492
uint32_t orf_token_discard
Definition: totemsrp.c:518
struct qb_list_head token_callback_sent_listhead
Definition: totemsrp.c:390
unsigned int last_released
Definition: totemsrp.c:486
unsigned int set_aru
Definition: totemsrp.c:488
int totemsrp_log_level_notice
Definition: totemsrp.c:430
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:373
int totemsrp_log_level_trace
Definition: totemsrp.c:434
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:392
unsigned int my_trc
Definition: totemsrp.c:506
struct cs_queue retrans_message_queue
Definition: totemsrp.c:375
struct memb_ring_id my_ring_id
Definition: totemsrp.c:340
int totemsrp_log_level_error
Definition: totemsrp.c:426
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:469
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:494
int fcc_remcast_current
Definition: totemsrp.c:296
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:409
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:401
unsigned int my_high_ring_delivered
Definition: totemsrp.c:364
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:407
struct totem_config * totem_config
Definition: totemsrp.c:502
int my_deliver_memb_entries
Definition: totemsrp.c:334
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:467
int my_trans_memb_entries
Definition: totemsrp.c:330
uint32_t originated_orf_token
Definition: totemsrp.c:520
void * token_recv_event_handle
Definition: totemsrp.c:528
struct sq recovery_sort_queue
Definition: totemsrp.c:379
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:403
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:411
int old_ring_state_saved
Definition: totemsrp.c:490
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:322
void * token_sent_event_handle
Definition: totemsrp.c:529
unsigned int my_high_delivered
Definition: totemsrp.c:386
int totemsrp_log_level_security
Definition: totemsrp.c:424
int totemsrp_log_level_warning
Definition: totemsrp.c:428
struct memb_commit_token * commit_token
Definition: totemsrp.c:514
char commit_token_storage[40000]
Definition: totemsrp.c:530
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:342
struct timeval tv_old
Definition: totemsrp.c:498
int my_left_memb_entries
Definition: totemsrp.c:336
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:476
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:450
unsigned int my_install_seq
Definition: totemsrp.c:356
qb_loop_timer_handle timer_orf_token_warning
Definition: totemsrp.c:405
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:314
struct srp_addr my_id
Definition: totemsrp.c:304
unsigned int my_cbl
Definition: totemsrp.c:510
int my_new_memb_entries
Definition: totemsrp.c:328
struct qb_list_head token_callback_received_listhead
Definition: totemsrp.c:388
unsigned int my_last_aru
Definition: totemsrp.c:348
unsigned int my_aru
Definition: totemsrp.c:384
uint32_t waiting_trans_ack
Definition: totemsrp.c:524
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:438
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:460
struct sq regular_sort_queue
Definition: totemsrp.c:377
unsigned long long token_ring_id_seq
Definition: totemsrp.c:484
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:318
int totemsrp_log_level_debug
Definition: totemsrp.c:432
unsigned int my_pbl
Definition: totemsrp.c:508
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition: totemsrp.c:306
uint64_t memb_join_tx
Definition: totemstats.h:59
uint32_t continuous_gather
Definition: totemstats.h:78
uint64_t recovery_entered
Definition: totemstats.h:74
uint64_t rx_msg_dropped
Definition: totemstats.h:77
uint64_t gather_entered
Definition: totemstats.h:70
uint64_t memb_commit_token_rx
Definition: totemstats.h:65
uint64_t mcast_retx
Definition: totemstats.h:62
uint64_t mcast_tx
Definition: totemstats.h:61
uint64_t memb_commit_token_tx
Definition: totemstats.h:64
uint64_t operational_token_lost
Definition: totemstats.h:69
uint64_t operational_entered
Definition: totemstats.h:68
uint64_t gather_token_lost
Definition: totemstats.h:71
uint64_t commit_token_lost
Definition: totemstats.h:73
uint64_t token_hold_cancel_tx
Definition: totemstats.h:66
uint64_t orf_token_rx
Definition: totemstats.h:56
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totemstats.h:90
uint64_t recovery_token_lost
Definition: totemstats.h:75
uint64_t commit_entered
Definition: totemstats.h:72
uint64_t memb_merge_detect_rx
Definition: totemstats.h:58
uint64_t memb_join_rx
Definition: totemstats.h:60
uint64_t orf_token_tx
Definition: totemstats.h:55
uint64_t memb_merge_detect_tx
Definition: totemstats.h:57
uint64_t mcast_rx
Definition: totemstats.h:63
uint64_t token_hold_cancel_rx
Definition: totemstats.h:67
uint64_t consensus_timeouts
Definition: totemstats.h:76
#define swab64(x)
The swab64 macro.
Definition: swab.h:65
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
totem_event_type
Definition: totem.h:290
#define TOTEM_MH_VERSION
Definition: totem.h:124
#define FRAME_SIZE_MAX
Definition: totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition: totem.h:154
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
Definition: totem.h:264
#define TOTEM_MH_MAGIC
Definition: totem.h:123
char type
Definition: totem.h:2
void totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
Definition: totemconfig.c:2417
const char * totemip_sa_print(const struct sockaddr *sa)
Definition: totemip.c:234
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:123
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemnet.c:471
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:553
void * totemnet_buffer_alloc(void *net_context)
Definition: totemnet.c:367
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:414
int totemnet_send_flush(void *net_context)
Definition: totemnet.c:404
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition: totemnet.c:317
void totemnet_buffer_release(void *net_context, void *ptr)
Definition: totemnet.c:375
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:426
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:533
int totemnet_finalize(void *net_context)
Definition: totemnet.c:306
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition: totemnet.c:292
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition: totemnet.c:497
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition: totemnet.c:383
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition: totemnet.c:510
int totemnet_recv_flush(void *net_context)
Definition: totemnet.c:394
int totemnet_iface_check(void *net_context)
Definition: totemnet.c:452
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:439
int totemnet_recv_mcast_empty(void *net_context)
Definition: totemnet.c:522
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition: totemnet.c:484
void totemnet_stats_clear(void *net_context)
Definition: totemnet.c:619
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition: totemnet.c:589
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition: totemnet.c:603
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1134
#define SEQNO_START_TOKEN
Definition: totemsrp.c:122
unsigned long long ring_seq
Definition: totemsrp.c:4
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:100
unsigned long long int tv_old
Definition: totemsrp.c:3838
unsigned int seq
Definition: totemsrp.c:2
#define log_printf(level, format, args...)
Definition: totemsrp.c:690
void totemsrp_force_gather(void *context)
Definition: totemsrp.c:5247
int rtr_list_entries
Definition: totemsrp.c:9
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5168
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:819
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3496
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:97
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5203
struct rtr_item rtr_list[0]
Definition: totemsrp.c:10
message_type
Definition: totemsrp.c:146
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition: totemsrp.c:151
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition: totemsrp.c:152
@ MESSAGE_TYPE_ORF_TOKEN
Definition: totemsrp.c:147
@ MESSAGE_TYPE_MEMB_JOIN
Definition: totemsrp.c:150
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition: totemsrp.c:149
@ MESSAGE_TYPE_MCAST
Definition: totemsrp.c:148
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5164
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:101
encapsulation_type
Definition: totemsrp.c:155
@ MESSAGE_NOT_ENCAPSULATED
Definition: totemsrp.c:157
@ MESSAGE_ENCAPSULATED
Definition: totemsrp.c:156
unsigned int failed_list_entries
Definition: totemsrp.c:3
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:678
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition: totemsrp.c:1042
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:102
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:96
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:5107
int guarantee
Definition: totemsrp.c:6
unsigned int aru
Definition: totemsrp.c:3
gather_state_from
Definition: totemsrp.c:542
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition: totemsrp.c:546
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition: totemsrp.c:551
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition: totemsrp.c:549
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition: totemsrp.c:543
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition: totemsrp.c:552
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition: totemsrp.c:545
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition: totemsrp.c:554
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition: totemsrp.c:558
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition: totemsrp.c:544
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition: totemsrp.c:547
@ TOTEMSRP_GSFROM_MAX
Definition: totemsrp.c:559
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition: totemsrp.c:556
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition: totemsrp.c:548
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition: totemsrp.c:550
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition: totemsrp.c:555
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition: totemsrp.c:553
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition: totemsrp.c:557
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1109
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2569
void totemsrp_stats_clear(void *context, int flags)
Definition: totemsrp.c:5237
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemsrp.c:5086
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1027
struct memb_ring_id ring_id
Definition: totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5210
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition: totemsrp.c:5228
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1123
int addr_entries
Definition: totemsrp.c:5
unsigned int backlog
Definition: totemsrp.c:6
#define SEQNO_START_MSG
Definition: totemsrp.c:121
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2489
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3531
unsigned int received_flg
Definition: totemsrp.c:3
struct message_item __attribute__
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5177
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1071
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition: totemsrp.c:5219
unsigned int high_delivered
Definition: totemsrp.c:2
struct srp_addr system_from
Definition: totemsrp.c:1
unsigned int proc_list_entries
Definition: totemsrp.c:2
const char * gather_state_from_desc[]
Definition: totemsrp.c:562
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5190
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2498
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition: totemsrp.c:5035
memb_state
Definition: totemsrp.c:277
@ MEMB_STATE_GATHER
Definition: totemsrp.c:279
@ MEMB_STATE_RECOVERY
Definition: totemsrp.c:281
@ MEMB_STATE_COMMIT
Definition: totemsrp.c:280
@ MEMB_STATE_OPERATIONAL
Definition: totemsrp.c:278
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition: totemstats.h:116
#define TOTEM_TOKEN_STATS_MAX
Definition: totemstats.h:89