corosync 3.1.7
totempg.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2005 MontaVista Software, Inc.
3 * Copyright (c) 2005 OSDL.
4 * Copyright (c) 2006-2012 Red Hat, Inc.
5 *
6 * All rights reserved.
7 *
8 * Author: Steven Dake (sdake@redhat.com)
9 * Author: Mark Haverkamp (markh@osdl.org)
10 *
11 * This software licensed under BSD license, the text of which follows:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37
38/*
39 * FRAGMENTATION AND PACKING ALGORITHM:
40 *
41 * Assemble the entire message into one buffer
42 * if full fragment
43 * store fragment into lengths list
44 * for each full fragment
45 * multicast fragment
46 * set length and fragment fields of pg mesage
47 * store remaining multicast into head of fragmentation data and set lens field
48 *
49 * If a message exceeds the maximum packet size allowed by the totem
50 * single ring protocol, the protocol could lose forward progress.
51 * Statically calculating the allowed data amount doesn't work because
52 * the amount of data allowed depends on the number of fragments in
53 * each message. In this implementation, the maximum fragment size
54 * is dynamically calculated for each fragment added to the message.
55
56 * It is possible for a message to be two bytes short of the maximum
57 * packet size. This occurs when a message or collection of
58 * messages + the mcast header + the lens are two bytes short of the
59 * end of the packet. Since another len field consumes two bytes, the
60 * len field would consume the rest of the packet without room for data.
61 *
62 * One optimization would be to forgo the final len field and determine
63 * it from the size of the udp datagram. Then this condition would no
64 * longer occur.
65 */
66
67/*
68 * ASSEMBLY AND UNPACKING ALGORITHM:
69 *
70 * copy incoming packet into assembly data buffer indexed by current
71 * location of end of fragment
72 *
73 * if not fragmented
74 * deliver all messages in assembly data buffer
75 * else
76 * if msg_count > 1 and fragmented
77 * deliver all messages except last message in assembly data buffer
78 * copy last fragmented section to start of assembly data buffer
79 * else
80 * if msg_count = 1 and fragmented
81 * do nothing
82 *
83 */
84
85#include <config.h>
86
87#ifdef HAVE_ALLOCA_H
88#include <alloca.h>
89#endif
90#include <sys/types.h>
91#include <sys/socket.h>
92#include <netinet/in.h>
93#include <arpa/inet.h>
94#include <sys/uio.h>
95#include <stdio.h>
96#include <stdlib.h>
97#include <string.h>
98#include <assert.h>
99#include <pthread.h>
100#include <errno.h>
101#include <limits.h>
102
103#include <corosync/swab.h>
104#include <qb/qblist.h>
105#include <qb/qbloop.h>
106#include <qb/qbipcs.h>
108#define LOGSYS_UTILS_ONLY 1
109#include <corosync/logsys.h>
110
111#include "util.h"
112#include "totemsrp.h"
113
115 short version;
116 short type;
117};
118
119#if !(defined(__i386__) || defined(__x86_64__))
120/*
121 * Need align on architectures different then i386 or x86_64
122 */
123#define TOTEMPG_NEED_ALIGN 1
124#endif
125
126/*
127 * totempg_mcast structure
128 *
129 * header: Identify the mcast.
130 * fragmented: Set if this message continues into next message
131 * continuation: Set if this message is a continuation from last message
132 * msg_count Indicates how many packed messages are contained
133 * in the mcast.
134 * Also, the size of each packed message and the messages themselves are
135 * appended to the end of this structure when sent.
136 */
139 unsigned char fragmented;
140 unsigned char continuation;
141 unsigned short msg_count;
142 /*
143 * short msg_len[msg_count];
144 */
145 /*
146 * data for messages
147 */
148};
149
150/*
151 * Maximum packet size for totem pg messages
152 */
153#define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
154 sizeof (struct totempg_mcast))
155
156/*
157 * Local variables used for packing small messages
158 */
159static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
160
161static int mcast_packed_msg_count = 0;
162
163static int totempg_reserved = 1;
164
165static unsigned int totempg_size_limit;
166
167static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
168
169static uint32_t totempg_threaded_mode = 0;
170
171static void *totemsrp_context;
172
173/*
174 * Function and data used to log messages
175 */
176static int totempg_log_level_security;
177static int totempg_log_level_error;
178static int totempg_log_level_warning;
179static int totempg_log_level_notice;
180static int totempg_log_level_debug;
181static int totempg_subsys_id;
182static void (*totempg_log_printf) (
183 int level,
184 int subsys,
185 const char *function,
186 const char *file,
187 int line,
188 const char *format, ...) __attribute__((format(printf, 6, 7)));
189
191
192static totempg_stats_t totempg_stats;
193
198
199struct assembly {
200 unsigned int nodeid;
201 unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE];
202 int index;
203 unsigned char last_frag_num;
205 struct qb_list_head list;
206};
207
208static void assembly_deref (struct assembly *assembly);
209
210static int callback_token_received_fn (enum totem_callback_token_type type,
211 const void *data);
212
213QB_LIST_DECLARE(assembly_list_inuse);
214
215/*
216 * Free list is used both for transitional and operational assemblies
217 */
218QB_LIST_DECLARE(assembly_list_free);
219
220QB_LIST_DECLARE(assembly_list_inuse_trans);
221
222QB_LIST_DECLARE(totempg_groups_list);
223
224/*
225 * Staging buffer for packed messages. Messages are staged in this buffer
226 * before sending. Multiple messages may fit which cuts down on the
227 * number of mcasts sent. If a message doesn't completely fit, then
228 * the mcast header has a fragment bit set that says that there are more
229 * data to follow. fragment_size is an index into the buffer. It indicates
230 * the size of message data and where to place new message data.
231 * fragment_contuation indicates whether the first packed message in
232 * the buffer is a continuation of a previously packed fragment.
233 */
234static unsigned char *fragmentation_data;
235
236static int fragment_size = 0;
237
238static int fragment_continuation = 0;
239
240static int totempg_waiting_transack = 0;
241
243 void (*deliver_fn) (
244 unsigned int nodeid,
245 const void *msg,
246 unsigned int msg_len,
247 int endian_conversion_required);
248
249 void (*confchg_fn) (
250 enum totem_configuration_type configuration_type,
251 const unsigned int *member_list, size_t member_list_entries,
252 const unsigned int *left_list, size_t left_list_entries,
253 const unsigned int *joined_list, size_t joined_list_entries,
254 const struct memb_ring_id *ring_id);
255
257
259 int32_t q_level;
260
261 struct qb_list_head list;
262};
263
264static unsigned char next_fragment = 1;
265
266static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267
268static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269
270static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271
272#define log_printf(level, format, args...) \
273do { \
274 totempg_log_printf(level, \
275 totempg_subsys_id, \
276 __FUNCTION__, __FILE__, __LINE__, \
277 format, ##args); \
278} while (0);
279
280static int msg_count_send_ok (int msg_count);
281
282static int byte_count_send_ok (int byte_count);
283
284static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
285{
286 log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
287 totempg_waiting_transack = waiting_trans_ack;
288}
289
290static struct assembly *assembly_ref (unsigned int nodeid)
291{
292 struct assembly *assembly;
293 struct qb_list_head *list;
294 struct qb_list_head *active_assembly_list_inuse;
295
296 if (totempg_waiting_transack) {
297 active_assembly_list_inuse = &assembly_list_inuse_trans;
298 } else {
299 active_assembly_list_inuse = &assembly_list_inuse;
300 }
301
302 /*
303 * Search inuse list for node id and return assembly buffer if found
304 */
305 qb_list_for_each(list, active_assembly_list_inuse) {
306 assembly = qb_list_entry (list, struct assembly, list);
307
308 if (nodeid == assembly->nodeid) {
309 return (assembly);
310 }
311 }
312
313 /*
314 * Nothing found in inuse list get one from free list if available
315 */
316 if (qb_list_empty (&assembly_list_free) == 0) {
317 assembly = qb_list_first_entry (&assembly_list_free, struct assembly, list);
318 qb_list_del (&assembly->list);
319 qb_list_add (&assembly->list, active_assembly_list_inuse);
321 assembly->index = 0;
324 return (assembly);
325 }
326
327 /*
328 * Nothing available in inuse or free list, so allocate a new one
329 */
330 assembly = malloc (sizeof (struct assembly));
331 /*
332 * TODO handle memory allocation failure here
333 */
334 assert (assembly);
336 assembly->data[0] = 0;
337 assembly->index = 0;
340 qb_list_init (&assembly->list);
341 qb_list_add (&assembly->list, active_assembly_list_inuse);
342
343 return (assembly);
344}
345
346static void assembly_deref (struct assembly *assembly)
347{
348 qb_list_del (&assembly->list);
349 qb_list_add (&assembly->list, &assembly_list_free);
350}
351
352static void assembly_deref_from_normal_and_trans (int nodeid)
353{
354 int j;
355 struct qb_list_head *list, *tmp_iter;
356 struct qb_list_head *active_assembly_list_inuse;
357 struct assembly *assembly;
358
359 for (j = 0; j < 2; j++) {
360 if (j == 0) {
361 active_assembly_list_inuse = &assembly_list_inuse;
362 } else {
363 active_assembly_list_inuse = &assembly_list_inuse_trans;
364 }
365
366 qb_list_for_each_safe(list, tmp_iter, active_assembly_list_inuse) {
367 assembly = qb_list_entry (list, struct assembly, list);
368
369 if (nodeid == assembly->nodeid) {
370 qb_list_del (&assembly->list);
371 qb_list_add (&assembly->list, &assembly_list_free);
372 }
373 }
374 }
375
376}
377
378static inline void app_confchg_fn (
379 enum totem_configuration_type configuration_type,
380 const unsigned int *member_list, size_t member_list_entries,
381 const unsigned int *left_list, size_t left_list_entries,
382 const unsigned int *joined_list, size_t joined_list_entries,
383 const struct memb_ring_id *ring_id)
384{
385 int i;
386 struct totempg_group_instance *instance;
387 struct qb_list_head *list;
388
389 /*
390 * For every leaving processor, add to free list
391 * This also has the side effect of clearing out the dataset
392 * In the leaving processor's assembly buffer.
393 */
394 for (i = 0; i < left_list_entries; i++) {
395 assembly_deref_from_normal_and_trans (left_list[i]);
396 }
397
398 qb_list_for_each(list, &totempg_groups_list) {
399 instance = qb_list_entry (list, struct totempg_group_instance, list);
400
401 if (instance->confchg_fn) {
402 instance->confchg_fn (
403 configuration_type,
404 member_list,
405 member_list_entries,
406 left_list,
407 left_list_entries,
408 joined_list,
409 joined_list_entries,
410 ring_id);
411 }
412 }
413}
414
415static inline void group_endian_convert (
416 void *msg,
417 int msg_len)
418{
419 unsigned short *group_len;
420 int i;
421 char *aligned_msg;
422
423#ifdef TOTEMPG_NEED_ALIGN
424 /*
425 * Align data structure for not i386 or x86_64
426 */
427 if ((size_t)msg % sizeof(char *) != 0) {
428 aligned_msg = alloca(msg_len);
429 memcpy(aligned_msg, msg, msg_len);
430 } else {
431 aligned_msg = msg;
432 }
433#else
434 aligned_msg = msg;
435#endif
436
437 group_len = (unsigned short *)aligned_msg;
438 group_len[0] = swab16(group_len[0]);
439 for (i = 1; i < group_len[0] + 1; i++) {
440 group_len[i] = swab16(group_len[i]);
441 }
442
443 if (aligned_msg != msg) {
444 memcpy(msg, aligned_msg, msg_len);
445 }
446}
447
448static inline int group_matches (
449 struct iovec *iovec,
450 unsigned int iov_len,
451 struct totempg_group *groups_b,
452 unsigned int group_b_cnt,
453 unsigned int *adjust_iovec)
454{
455 unsigned short *group_len;
456 char *group_name;
457 int i;
458 int j;
459#ifdef TOTEMPG_NEED_ALIGN
460 struct iovec iovec_aligned = { NULL, 0 };
461#endif
462
463 assert (iov_len == 1);
464
465#ifdef TOTEMPG_NEED_ALIGN
466 /*
467 * Align data structure for not i386 or x86_64
468 */
469 if ((size_t)iovec->iov_base % sizeof(char *) != 0) {
470 iovec_aligned.iov_base = alloca(iovec->iov_len);
471 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
472 iovec_aligned.iov_len = iovec->iov_len;
473 iovec = &iovec_aligned;
474 }
475#endif
476
477 group_len = (unsigned short *)iovec->iov_base;
478 group_name = ((char *)iovec->iov_base) +
479 sizeof (unsigned short) * (group_len[0] + 1);
480
481
482 /*
483 * Calculate amount to adjust the iovec by before delivering to app
484 */
485 *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
486 for (i = 1; i < group_len[0] + 1; i++) {
487 *adjust_iovec += group_len[i];
488 }
489
490 /*
491 * Determine if this message should be delivered to this instance
492 */
493 for (i = 1; i < group_len[0] + 1; i++) {
494 for (j = 0; j < group_b_cnt; j++) {
495 if ((group_len[i] == groups_b[j].group_len) &&
496 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
497 return (1);
498 }
499 }
500 group_name += group_len[i];
501 }
502 return (0);
503}
504
505
506static inline void app_deliver_fn (
507 unsigned int nodeid,
508 void *msg,
509 unsigned int msg_len,
510 int endian_conversion_required)
511{
512 struct totempg_group_instance *instance;
513 struct iovec stripped_iovec;
514 unsigned int adjust_iovec;
515 struct iovec *iovec;
516 struct qb_list_head *list;
517
518 struct iovec aligned_iovec = { NULL, 0 };
519
520 if (endian_conversion_required) {
521 group_endian_convert (msg, msg_len);
522 }
523
524 /*
525 * TODO: segmentation/assembly need to be redesigned to provide aligned access
526 * in all cases to avoid memory copies on non386 archs. Probably broke backwars
527 * compatibility
528 */
529
530#ifdef TOTEMPG_NEED_ALIGN
531 /*
532 * Align data structure for not i386 or x86_64
533 */
534 aligned_iovec.iov_base = alloca(msg_len);
535 aligned_iovec.iov_len = msg_len;
536 memcpy(aligned_iovec.iov_base, msg, msg_len);
537#else
538 aligned_iovec.iov_base = msg;
539 aligned_iovec.iov_len = msg_len;
540#endif
541
542 iovec = &aligned_iovec;
543
544 qb_list_for_each(list, &totempg_groups_list) {
545 instance = qb_list_entry (list, struct totempg_group_instance, list);
546 if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
547 stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
548 stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
549
551 /*
552 * Align data structure for not i386 or x86_64
553 */
554 if ((uintptr_t)((char *)iovec->iov_base + adjust_iovec) % (sizeof(char *)) != 0) {
555 /*
556 * Deal with misalignment
557 */
558 stripped_iovec.iov_base =
559 alloca (stripped_iovec.iov_len);
560 memcpy (stripped_iovec.iov_base,
561 (char *)iovec->iov_base + adjust_iovec,
562 stripped_iovec.iov_len);
563 }
564#endif
565 instance->deliver_fn (
566 nodeid,
567 stripped_iovec.iov_base,
568 stripped_iovec.iov_len,
569 endian_conversion_required);
570 }
571 }
572}
573
574static void totempg_confchg_fn (
575 enum totem_configuration_type configuration_type,
576 const unsigned int *member_list, size_t member_list_entries,
577 const unsigned int *left_list, size_t left_list_entries,
578 const unsigned int *joined_list, size_t joined_list_entries,
579 const struct memb_ring_id *ring_id)
580{
581// TODO optimize this
582 app_confchg_fn (configuration_type,
583 member_list, member_list_entries,
584 left_list, left_list_entries,
585 joined_list, joined_list_entries,
586 ring_id);
587}
588
589static void totempg_deliver_fn (
590 unsigned int nodeid,
591 const void *msg,
592 unsigned int msg_len,
593 int endian_conversion_required)
594{
595 struct totempg_mcast *mcast;
596 unsigned short *msg_lens;
597 int i;
598 struct assembly *assembly;
600 int msg_count;
601 int continuation;
602 int start;
603 const char *data;
604 int datasize;
605 struct iovec iov_delv;
606 size_t expected_msg_len;
607
608 assembly = assembly_ref (nodeid);
609 assert (assembly);
610
611 if (msg_len < sizeof(struct totempg_mcast)) {
612 log_printf(LOG_WARNING,
613 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID " is too short... Ignoring.", nodeid);
614
615 return ;
616 }
617
618 /*
619 * Assemble the header into one block of data and
620 * assemble the packet contents into one block of data to simplify delivery
621 */
622
623 mcast = (struct totempg_mcast *)msg;
624 if (endian_conversion_required) {
625 mcast->msg_count = swab16 (mcast->msg_count);
626 }
627
628 msg_count = mcast->msg_count;
629 datasize = sizeof (struct totempg_mcast) +
630 msg_count * sizeof (unsigned short);
631
632 if (msg_len < datasize) {
633 log_printf(LOG_WARNING,
634 "Message (totempg_mcast datasize) received from node " CS_PRI_NODE_ID
635 " is too short... Ignoring.", nodeid);
636
637 return ;
638 }
639
640 memcpy (header, msg, datasize);
641 data = msg;
642
643 msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
644 expected_msg_len = datasize;
645 for (i = 0; i < mcast->msg_count; i++) {
646 if (endian_conversion_required) {
647 msg_lens[i] = swab16 (msg_lens[i]);
648 }
649
650 expected_msg_len += msg_lens[i];
651 }
652
653 if (msg_len != expected_msg_len) {
654 log_printf(LOG_WARNING,
655 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID
656 " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
657 nodeid, expected_msg_len, msg_len);
658
659 return ;
660 }
661
662 assert((assembly->index+msg_len) < sizeof(assembly->data));
663 memcpy (&assembly->data[assembly->index], &data[datasize],
664 msg_len - datasize);
665
666 /*
667 * If the last message in the buffer is a fragment, then we
668 * can't deliver it. We'll first deliver the full messages
669 * then adjust the assembly buffer so we can add the rest of the
670 * fragment when it arrives.
671 */
672 msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
673 continuation = mcast->continuation;
674 iov_delv.iov_base = (void *)&assembly->data[0];
675 iov_delv.iov_len = assembly->index + msg_lens[0];
676
677 /*
678 * Make sure that if this message is a continuation, that it
679 * matches the sequence number of the previous fragment.
680 * Also, if the first packed message is a continuation
681 * of a previous message, but the assembly buffer
682 * is empty, then we need to discard it since we can't
683 * assemble a complete message. Likewise, if this message isn't a
684 * continuation and the assembly buffer is empty, we have to discard
685 * the continued message.
686 */
687 start = 0;
688
690 /* Throw away the first msg block */
691 if (mcast->fragmented == 0 || mcast->fragmented == 1) {
693
694 assembly->index += msg_lens[0];
695 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
696 iov_delv.iov_len = msg_lens[1];
697 start = 1;
698 }
699 } else
702 assembly->last_frag_num = mcast->fragmented;
703 for (i = start; i < msg_count; i++) {
704 app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
705 endian_conversion_required);
706 assembly->index += msg_lens[i];
707 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
708 if (i < (msg_count - 1)) {
709 iov_delv.iov_len = msg_lens[i + 1];
710 }
711 }
712 } else {
713 log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
716 }
717 }
718
719 if (mcast->fragmented == 0) {
720 /*
721 * End of messages, dereference assembly struct
722 */
724 assembly->index = 0;
725 assembly_deref (assembly);
726 } else {
727 /*
728 * Message is fragmented, keep around assembly list
729 */
730 if (mcast->msg_count > 1) {
731 memmove (&assembly->data[0],
733 msg_lens[msg_count]);
734
735 assembly->index = 0;
736 }
737 assembly->index += msg_lens[msg_count];
738 }
739}
740
741/*
742 * Totem Process Group Abstraction
743 * depends on poll abstraction, POSIX, IPV4
744 */
745
747
748int callback_token_received_fn (enum totem_callback_token_type type,
749 const void *data)
750{
751 struct totempg_mcast mcast;
752 struct iovec iovecs[3];
753
754 if (totempg_threaded_mode == 1) {
755 pthread_mutex_lock (&mcast_msg_mutex);
756 }
757 if (mcast_packed_msg_count == 0) {
758 if (totempg_threaded_mode == 1) {
759 pthread_mutex_unlock (&mcast_msg_mutex);
760 }
761 return (0);
762 }
763 if (totemsrp_avail(totemsrp_context) == 0) {
764 if (totempg_threaded_mode == 1) {
765 pthread_mutex_unlock (&mcast_msg_mutex);
766 }
767 return (0);
768 }
769 mcast.header.version = 0;
770 mcast.header.type = 0;
771 mcast.fragmented = 0;
772
773 /*
774 * Was the first message in this buffer a continuation of a
775 * fragmented message?
776 */
777 mcast.continuation = fragment_continuation;
778 fragment_continuation = 0;
779
780 mcast.msg_count = mcast_packed_msg_count;
781
782 iovecs[0].iov_base = (void *)&mcast;
783 iovecs[0].iov_len = sizeof (struct totempg_mcast);
784 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
785 iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
786 iovecs[2].iov_base = (void *)&fragmentation_data[0];
787 iovecs[2].iov_len = fragment_size;
788 (void)totemsrp_mcast (totemsrp_context, iovecs, 3, 0);
789
790 mcast_packed_msg_count = 0;
791 fragment_size = 0;
792
793 if (totempg_threaded_mode == 1) {
794 pthread_mutex_unlock (&mcast_msg_mutex);
795 }
796 return (0);
797}
798
799/*
800 * Initialize the totem process group abstraction
801 */
803 qb_loop_t *poll_handle,
805{
806 int res;
807
809 totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
816
817 fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
818 if (fragmentation_data == 0) {
819 return (-1);
820 }
821
823
824 res = totemsrp_initialize (
825 poll_handle,
826 &totemsrp_context,
828 &totempg_stats,
829 totempg_deliver_fn,
830 totempg_confchg_fn,
831 totempg_waiting_trans_ack_cb);
832
833 if (res == -1) {
834 goto error_exit;
835 }
836
838 totemsrp_context,
841 0,
842 callback_token_received_fn,
843 0);
844
845 totempg_size_limit = (totemsrp_avail(totemsrp_context) - 1) *
847 sizeof (struct totempg_mcast) - 16);
848
849 qb_list_init (&totempg_groups_list);
850
851error_exit:
852 return (res);
853}
854
856{
857 if (totempg_threaded_mode == 1) {
858 pthread_mutex_lock (&totempg_mutex);
859 }
860 totemsrp_finalize (totemsrp_context);
861 if (totempg_threaded_mode == 1) {
862 pthread_mutex_unlock (&totempg_mutex);
863 }
864}
865
866/*
867 * Multicast a message
868 */
869static int mcast_msg (
870 struct iovec *iovec_in,
871 unsigned int iov_len,
872 int guarantee)
873{
874 int res = 0;
875 struct totempg_mcast mcast;
876 struct iovec iovecs[3];
877 struct iovec iovec[64];
878 int i;
879 int dest, src;
880 int max_packet_size = 0;
881 int copy_len = 0;
882 int copy_base = 0;
883 int total_size = 0;
884
885 if (totempg_threaded_mode == 1) {
886 pthread_mutex_lock (&mcast_msg_mutex);
887 }
888 totemsrp_event_signal (totemsrp_context, TOTEM_EVENT_NEW_MSG, 1);
889
890 /*
891 * Remove zero length iovectors from the list
892 */
893 assert (iov_len < 64);
894 for (dest = 0, src = 0; src < iov_len; src++) {
895 if (iovec_in[src].iov_len) {
896 memcpy (&iovec[dest++], &iovec_in[src],
897 sizeof (struct iovec));
898 }
899 }
900 iov_len = dest;
901
902 max_packet_size = TOTEMPG_PACKET_SIZE -
903 (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
904
905 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
906
907 /*
908 * Check if we would overwrite new message queue
909 */
910 for (i = 0; i < iov_len; i++) {
911 total_size += iovec[i].iov_len;
912 }
913
914 if (byte_count_send_ok (total_size + sizeof(unsigned short) *
915 (mcast_packed_msg_count)) == 0) {
916
917 if (totempg_threaded_mode == 1) {
918 pthread_mutex_unlock (&mcast_msg_mutex);
919 }
920 return(-1);
921 }
922
923 memset(&mcast, 0, sizeof(mcast));
924
925 mcast.header.version = 0;
926 for (i = 0; i < iov_len; ) {
927 mcast.fragmented = 0;
928 mcast.continuation = fragment_continuation;
929 copy_len = iovec[i].iov_len - copy_base;
930
931 /*
932 * If it all fits with room left over, copy it in.
933 * We need to leave at least sizeof(short) + 1 bytes in the
934 * fragment_buffer on exit so that max_packet_size + fragment_size
935 * doesn't exceed the size of the fragment_buffer on the next call.
936 */
937 if ((iovec[i].iov_len + fragment_size) <
938 (max_packet_size - sizeof (unsigned short))) {
939
940 memcpy (&fragmentation_data[fragment_size],
941 (char *)iovec[i].iov_base + copy_base, copy_len);
942 fragment_size += copy_len;
943 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
944 next_fragment = 1;
945 copy_len = 0;
946 copy_base = 0;
947 i++;
948 continue;
949
950 /*
951 * If it just fits or is too big, then send out what fits.
952 */
953 } else {
954 unsigned char *data_ptr;
955
956 copy_len = min(copy_len, max_packet_size - fragment_size);
957 if( copy_len == max_packet_size )
958 data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
959 else {
960 data_ptr = fragmentation_data;
961 }
962
963 memcpy (&fragmentation_data[fragment_size],
964 (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
965 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
966
967 /*
968 * if we're not on the last iovec or the iovec is too large to
969 * fit, then indicate a fragment. This also means that the next
970 * message will have the continuation of this one.
971 */
972 if ((i < (iov_len - 1)) ||
973 ((copy_base + copy_len) < iovec[i].iov_len)) {
974 if (!next_fragment) {
975 next_fragment++;
976 }
977 fragment_continuation = next_fragment;
978 mcast.fragmented = next_fragment++;
979 assert(fragment_continuation != 0);
980 assert(mcast.fragmented != 0);
981 } else {
982 fragment_continuation = 0;
983 }
984
985 /*
986 * assemble the message and send it
987 */
988 mcast.msg_count = ++mcast_packed_msg_count;
989 iovecs[0].iov_base = (void *)&mcast;
990 iovecs[0].iov_len = sizeof(struct totempg_mcast);
991 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
992 iovecs[1].iov_len = mcast_packed_msg_count *
993 sizeof(unsigned short);
994 iovecs[2].iov_base = (void *)data_ptr;
995 iovecs[2].iov_len = fragment_size + copy_len;
996 assert (totemsrp_avail(totemsrp_context) > 0);
997 res = totemsrp_mcast (totemsrp_context, iovecs, 3, guarantee);
998 if (res == -1) {
999 goto error_exit;
1000 }
1001
1002 /*
1003 * Recalculate counts and indexes for the next.
1004 */
1005 mcast_packed_msg_lens[0] = 0;
1006 mcast_packed_msg_count = 0;
1007 fragment_size = 0;
1008 max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
1009
1010 /*
1011 * If the iovec all fit, go to the next iovec
1012 */
1013 if ((copy_base + copy_len) == iovec[i].iov_len) {
1014 copy_len = 0;
1015 copy_base = 0;
1016 i++;
1017
1018 /*
1019 * Continue with the rest of the current iovec.
1020 */
1021 } else {
1022 copy_base += copy_len;
1023 }
1024 }
1025 }
1026
1027 /*
1028 * Bump only if we added message data. This may be zero if
1029 * the last buffer just fit into the fragmentation_data buffer
1030 * and we were at the last iovec.
1031 */
1032 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1033 mcast_packed_msg_count++;
1034 }
1035
1036error_exit:
1037 if (totempg_threaded_mode == 1) {
1038 pthread_mutex_unlock (&mcast_msg_mutex);
1039 }
1040 return (res);
1041}
1042
1043/*
1044 * Determine if a message of msg_size could be queued
1045 */
1046static int msg_count_send_ok (
1047 int msg_count)
1048{
1049 int avail = 0;
1050
1051 avail = totemsrp_avail (totemsrp_context);
1052 totempg_stats.msg_queue_avail = avail;
1053
1054 return ((avail - totempg_reserved) > msg_count);
1055}
1056
1057static int byte_count_send_ok (
1058 int byte_count)
1059{
1060 unsigned int msg_count = 0;
1061 int avail = 0;
1062
1063 avail = totemsrp_avail (totemsrp_context);
1064
1065 msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1066
1067 return (avail >= msg_count);
1068}
1069
1070static int send_reserve (
1071 int msg_size)
1072{
1073 unsigned int msg_count = 0;
1074
1075 msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1076 totempg_reserved += msg_count;
1077 totempg_stats.msg_reserved = totempg_reserved;
1078
1079 return (msg_count);
1080}
1081
1082static void send_release (
1083 int msg_count)
1084{
1085 totempg_reserved -= msg_count;
1086 totempg_stats.msg_reserved = totempg_reserved;
1087}
1088
1089#ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1090#undef MESSAGE_QUEUE_MAX
1091#define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1092#endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1093
1094static uint32_t q_level_precent_used(void)
1095{
1096 return (100 - (((totemsrp_avail(totemsrp_context) - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
1097}
1098
1100 void **handle_out,
1102 int delete,
1103 int (*callback_fn) (enum totem_callback_token_type type, const void *),
1104 const void *data)
1105{
1106 unsigned int res;
1107 if (totempg_threaded_mode == 1) {
1108 pthread_mutex_lock (&callback_token_mutex);
1109 }
1110 res = totemsrp_callback_token_create (totemsrp_context, handle_out, type, delete,
1111 callback_fn, data);
1112 if (totempg_threaded_mode == 1) {
1113 pthread_mutex_unlock (&callback_token_mutex);
1114 }
1115 return (res);
1116}
1117
1119 void *handle_out)
1120{
1121 if (totempg_threaded_mode == 1) {
1122 pthread_mutex_lock (&callback_token_mutex);
1123 }
1124 totemsrp_callback_token_destroy (totemsrp_context, handle_out);
1125 if (totempg_threaded_mode == 1) {
1126 pthread_mutex_unlock (&callback_token_mutex);
1127 }
1128}
1129
1130/*
1131 * vi: set autoindent tabstop=4 shiftwidth=4 :
1132 */
1133
1135 void **totempg_groups_instance,
1136
1137 void (*deliver_fn) (
1138 unsigned int nodeid,
1139 const void *msg,
1140 unsigned int msg_len,
1141 int endian_conversion_required),
1142
1143 void (*confchg_fn) (
1144 enum totem_configuration_type configuration_type,
1145 const unsigned int *member_list, size_t member_list_entries,
1146 const unsigned int *left_list, size_t left_list_entries,
1147 const unsigned int *joined_list, size_t joined_list_entries,
1148 const struct memb_ring_id *ring_id))
1149{
1150 struct totempg_group_instance *instance;
1151
1152 if (totempg_threaded_mode == 1) {
1153 pthread_mutex_lock (&totempg_mutex);
1154 }
1155
1156 instance = malloc (sizeof (struct totempg_group_instance));
1157 if (instance == NULL) {
1158 goto error_exit;
1159 }
1160
1161 instance->deliver_fn = deliver_fn;
1162 instance->confchg_fn = confchg_fn;
1163 instance->groups = 0;
1164 instance->groups_cnt = 0;
1165 instance->q_level = QB_LOOP_MED;
1166 qb_list_init (&instance->list);
1167 qb_list_add (&instance->list, &totempg_groups_list);
1168
1169 if (totempg_threaded_mode == 1) {
1170 pthread_mutex_unlock (&totempg_mutex);
1171 }
1172 *totempg_groups_instance = instance;
1173 return (0);
1174
1175error_exit:
1176 if (totempg_threaded_mode == 1) {
1177 pthread_mutex_unlock (&totempg_mutex);
1178 }
1179 return (-1);
1180}
1181
1183 void *totempg_groups_instance,
1184 const struct totempg_group *groups,
1185 size_t group_cnt)
1186{
1187 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1188 struct totempg_group *new_groups;
1189 int res = 0;
1190
1191 if (totempg_threaded_mode == 1) {
1192 pthread_mutex_lock (&totempg_mutex);
1193 }
1194
1195 new_groups = realloc (instance->groups,
1196 sizeof (struct totempg_group) *
1197 (instance->groups_cnt + group_cnt));
1198 if (new_groups == 0) {
1199 res = -1;
1200 goto error_exit;
1201 }
1202 memcpy (&new_groups[instance->groups_cnt],
1203 groups, group_cnt * sizeof (struct totempg_group));
1204 instance->groups = new_groups;
1205 instance->groups_cnt += group_cnt;
1206
1207error_exit:
1208 if (totempg_threaded_mode == 1) {
1209 pthread_mutex_unlock (&totempg_mutex);
1210 }
1211 return (res);
1212}
1213
1215 void *totempg_groups_instance,
1216 const struct totempg_group *groups,
1217 size_t group_cnt)
1218{
1219 if (totempg_threaded_mode == 1) {
1220 pthread_mutex_lock (&totempg_mutex);
1221 }
1222
1223 if (totempg_threaded_mode == 1) {
1224 pthread_mutex_unlock (&totempg_mutex);
1225 }
1226 return (0);
1227}
1228
1229#define MAX_IOVECS_FROM_APP 32
1230#define MAX_GROUPS_PER_MSG 32
1231
1233 void *totempg_groups_instance,
1234 const struct iovec *iovec,
1235 unsigned int iov_len,
1236 int guarantee)
1237{
1238 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1239 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1240 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1241 int i;
1242 unsigned int res;
1243
1244 if (totempg_threaded_mode == 1) {
1245 pthread_mutex_lock (&totempg_mutex);
1246 }
1247
1248 /*
1249 * Build group_len structure and the iovec_mcast structure
1250 */
1251 group_len[0] = instance->groups_cnt;
1252 for (i = 0; i < instance->groups_cnt; i++) {
1253 group_len[i + 1] = instance->groups[i].group_len;
1254 iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1255 iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
1256 }
1257 iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1258 iovec_mcast[0].iov_base = group_len;
1259 for (i = 0; i < iov_len; i++) {
1260 iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1261 iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1262 }
1263
1264 res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1265
1266 if (totempg_threaded_mode == 1) {
1267 pthread_mutex_unlock (&totempg_mutex);
1268 }
1269
1270 return (res);
1271}
1272
1273static void check_q_level(
1274 void *totempg_groups_instance)
1275{
1276 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1277 int32_t old_level = instance->q_level;
1278 int32_t percent_used = q_level_precent_used();
1279
1280 if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
1281 instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1282 } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1283 instance->q_level = TOTEM_Q_LEVEL_LOW;
1284 } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
1285 instance->q_level = TOTEM_Q_LEVEL_GOOD;
1286 } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
1287 instance->q_level = TOTEM_Q_LEVEL_HIGH;
1288 }
1289 if (totem_queue_level_changed && old_level != instance->q_level) {
1290 totem_queue_level_changed(instance->q_level);
1291 }
1292}
1293
1295 void *totempg_groups_instance)
1296{
1297 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1298
1299 check_q_level(instance);
1300}
1301
1303 void *totempg_groups_instance,
1304 const struct iovec *iovec,
1305 unsigned int iov_len)
1306{
1307 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1308 unsigned int size = 0;
1309 unsigned int i;
1310 unsigned int reserved = 0;
1311
1312 if (totempg_threaded_mode == 1) {
1313 pthread_mutex_lock (&totempg_mutex);
1314 pthread_mutex_lock (&mcast_msg_mutex);
1315 }
1316
1317 for (i = 0; i < instance->groups_cnt; i++) {
1318 size += instance->groups[i].group_len;
1319 }
1320 for (i = 0; i < iov_len; i++) {
1321 size += iovec[i].iov_len;
1322 }
1323
1324 if (size >= totempg_size_limit) {
1325 reserved = -1;
1326 goto error_exit;
1327 }
1328
1329 if (byte_count_send_ok (size)) {
1330 reserved = send_reserve (size);
1331 } else {
1332 reserved = 0;
1333 }
1334
1335error_exit:
1336 check_q_level(instance);
1337
1338 if (totempg_threaded_mode == 1) {
1339 pthread_mutex_unlock (&mcast_msg_mutex);
1340 pthread_mutex_unlock (&totempg_mutex);
1341 }
1342 return (reserved);
1343}
1344
1345
1347{
1348 if (totempg_threaded_mode == 1) {
1349 pthread_mutex_lock (&totempg_mutex);
1350 pthread_mutex_lock (&mcast_msg_mutex);
1351 }
1352 send_release (msg_count);
1353 if (totempg_threaded_mode == 1) {
1354 pthread_mutex_unlock (&mcast_msg_mutex);
1355 pthread_mutex_unlock (&totempg_mutex);
1356 }
1357 return 0;
1358}
1359
1361 void *totempg_groups_instance,
1362 int guarantee,
1363 const struct totempg_group *groups,
1364 size_t groups_cnt,
1365 const struct iovec *iovec,
1366 unsigned int iov_len)
1367{
1368 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1369 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1370 int i;
1371 unsigned int res;
1372
1373 if (totempg_threaded_mode == 1) {
1374 pthread_mutex_lock (&totempg_mutex);
1375 }
1376
1377 /*
1378 * Build group_len structure and the iovec_mcast structure
1379 */
1380 group_len[0] = groups_cnt;
1381 for (i = 0; i < groups_cnt; i++) {
1382 group_len[i + 1] = groups[i].group_len;
1383 iovec_mcast[i + 1].iov_len = groups[i].group_len;
1384 iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
1385 }
1386 iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1387 iovec_mcast[0].iov_base = group_len;
1388 for (i = 0; i < iov_len; i++) {
1389 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1390 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1391 }
1392
1393 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1394
1395 if (totempg_threaded_mode == 1) {
1396 pthread_mutex_unlock (&totempg_mutex);
1397 }
1398 return (res);
1399}
1400
1401/*
1402 * Returns -1 if error, 0 if can't send, 1 if can send the message
1403 */
1405 void *totempg_groups_instance,
1406 const struct totempg_group *groups,
1407 size_t groups_cnt,
1408 const struct iovec *iovec,
1409 unsigned int iov_len)
1410{
1411 unsigned int size = 0;
1412 unsigned int i;
1413 unsigned int res;
1414
1415 if (totempg_threaded_mode == 1) {
1416 pthread_mutex_lock (&totempg_mutex);
1417 }
1418
1419 for (i = 0; i < groups_cnt; i++) {
1420 size += groups[i].group_len;
1421 }
1422 for (i = 0; i < iov_len; i++) {
1423 size += iovec[i].iov_len;
1424 }
1425
1426 res = msg_count_send_ok (size);
1427
1428 if (totempg_threaded_mode == 1) {
1429 pthread_mutex_unlock (&totempg_mutex);
1430 }
1431 return (res);
1432}
1433
1435 struct totem_ip_address *interface_addr,
1436 unsigned short ip_port,
1437 unsigned int iface_no)
1438{
1439 int res;
1440
1441 res = totemsrp_iface_set (
1442 totemsrp_context,
1443 interface_addr,
1444 ip_port,
1445 iface_no);
1446
1447 return (res);
1448}
1449
1451 struct totem_node_status *node_status)
1452{
1453 memset(node_status, 0, sizeof(struct totem_node_status));
1454 return totemsrp_nodestatus_get (totemsrp_context, nodeid, node_status);
1455}
1456
1458 unsigned int nodeid,
1459 unsigned int *interface_id,
1460 struct totem_ip_address *interfaces,
1461 unsigned int interfaces_size,
1462 char ***status,
1463 unsigned int *iface_count)
1464{
1465 int res;
1466
1467 res = totemsrp_ifaces_get (
1468 totemsrp_context,
1469 nodeid,
1470 interface_id,
1471 interfaces,
1472 interfaces_size,
1473 status,
1474 iface_count);
1475
1476 return (res);
1477}
1478
1480{
1481 totemsrp_event_signal (totemsrp_context, type, value);
1482}
1483
1485{
1486 return &totempg_stats;
1487}
1488
1490 const char *cipher_type,
1491 const char *hash_type)
1492{
1493 int res;
1494
1495 res = totemsrp_crypto_set (totemsrp_context, cipher_type, hash_type);
1496
1497 return (res);
1498}
1499
1500#define ONE_IFACE_LEN 63
1501const char *totempg_ifaces_print (unsigned int nodeid)
1502{
1503 static char iface_string[256 * INTERFACE_MAX];
1504 char one_iface[ONE_IFACE_LEN+1];
1505 struct totem_ip_address interfaces[INTERFACE_MAX];
1506 unsigned int iface_count;
1507 unsigned int iface_ids[INTERFACE_MAX];
1508 unsigned int i;
1509 int res;
1510
1511 iface_string[0] = '\0';
1512
1513 res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1514 if (res == -1) {
1515 return ("no interface found for nodeid");
1516 }
1517
1518 res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1519
1520 for (i = 0; i < iface_count; i++) {
1521 if (!interfaces[i].family) {
1522 continue;
1523 }
1524 snprintf (one_iface, ONE_IFACE_LEN,
1525 "r(%d) ip(%s) ",
1526 i, totemip_print (&interfaces[i]));
1527 strcat (iface_string, one_iface);
1528 }
1529 return (iface_string);
1530}
1531
1532unsigned int totempg_my_nodeid_get (void)
1533{
1534 return (totemsrp_my_nodeid_get(totemsrp_context));
1535}
1536
1538{
1539 return (totemsrp_my_family_get(totemsrp_context));
1540}
1542 void (*totem_service_ready) (void))
1543{
1544 totemsrp_service_ready_register (totemsrp_context, totem_service_ready);
1545}
1546
1548{
1549 totem_queue_level_changed = fn;
1550}
1551
1553 const struct totem_ip_address *member,
1554 int ring_no)
1555{
1556 return totemsrp_member_add (totemsrp_context, member, ring_no);
1557}
1558
1560 const struct totem_ip_address *member,
1561 int ring_no)
1562{
1563 return totemsrp_member_remove (totemsrp_context, member, ring_no);
1564}
1565
1566extern int totempg_reconfigure (void)
1567{
1568 return totemsrp_reconfigure (totemsrp_context, totempg_totem_config);
1569}
1570
1572{
1573 return totemsrp_crypto_reconfigure_phase (totemsrp_context, totempg_totem_config, phase);
1574}
1575
1577{
1579 totempg_stats.msg_reserved = 0;
1580 totempg_stats.msg_queue_avail = 0;
1581 }
1582 return totemsrp_stats_clear (totemsrp_context, flags);
1583}
1584
1586{
1587 totempg_threaded_mode = 1;
1588 totemsrp_threaded_mode_enable (totemsrp_context);
1589}
1590
1592{
1593 totemsrp_trans_ack (totemsrp_context);
1594}
1595
1597{
1598 totemsrp_force_gather(totemsrp_context);
1599}
1600
1601/* Assumes ->orig_interfaces is already allocated */
1603{
1604 struct totem_interface *temp_if = config->orig_interfaces;
1605
1606 memcpy(config, totempg_totem_config, sizeof(struct totem_config));
1607 config->orig_interfaces = temp_if;
1609 config->interfaces = NULL;
1610}
1611
1613{
1615
1616 /* Preseve the existing interfaces[] array as transports might have pointers saved */
1617 memcpy(totempg_totem_config->interfaces, config->interfaces, sizeof(struct totem_interface) * INTERFACE_MAX);
1618 memcpy(totempg_totem_config, config, sizeof(struct totem_config));
1620}
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
#define INTERFACE_MAX
Definition: coroapi.h:88
unsigned short family
Definition: coroapi.h:1
#define MESSAGE_SIZE_MAX
Definition: coroapi.h:97
unsigned int nodeid
Definition: coroapi.h:0
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition: coroapi.h:143
#define CS_PRI_NODE_ID
Definition: corotypes.h:59
#define min(a, b)
Definition: exec/util.h:66
uint32_t flags
uint32_t value
int index
Definition: totempg.c:202
struct qb_list_head list
Definition: totempg.c:205
unsigned int nodeid
Definition: totempg.c:200
unsigned char last_frag_num
Definition: totempg.c:203
enum throw_away_mode throw_away_mode
Definition: totempg.c:204
unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE]
Definition: totempg.c:201
struct totem_message_header header
Definition: totemsrp.c:184
The memb_ring_id struct.
Definition: coroapi.h:122
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:208
struct totem_interface * interfaces
Definition: totem.h:165
struct totem_interface * orig_interfaces
Definition: totem.h:166
unsigned int net_mtu
Definition: totem.h:210
The totem_ip_address struct.
Definition: coroapi.h:111
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:101
void(*) in log_level_security)
Definition: totem.h:110
struct qb_list_head list
Definition: totempg.c:261
void(* confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totempg.c:249
struct totempg_group * groups
Definition: totempg.c:256
void(* deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totempg.c:243
const void * group
Definition: totempg.h:56
size_t group_len
Definition: totempg.h:57
unsigned short msg_count
Definition: totempg.c:141
unsigned char fragmented
Definition: totempg.c:139
struct totempg_mcast_header header
Definition: totempg.c:138
unsigned char continuation
Definition: totempg.c:140
uint32_t msg_queue_avail
Definition: totemstats.h:98
uint32_t msg_reserved
Definition: totemstats.h:97
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
typedef __attribute__
totem_event_type
Definition: totem.h:290
@ TOTEM_EVENT_NEW_MSG
Definition: totem.h:292
#define FRAME_SIZE_MAX
Definition: totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition: totem.h:154
char type
Definition: totem.h:2
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:256
int totempg_my_family_get(void)
Definition: totempg.c:1537
unsigned int totempg_my_nodeid_get(void)
Definition: totempg.c:1532
static void(*) struct totem_config totempg_totem_config)
Definition: totempg.c:190
int totempg_groups_leave(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1214
void totempg_check_q_level(void *totempg_groups_instance)
Definition: totempg.c:1294
int totempg_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totempg.c:1099
#define log_printf(level, format, args...)
Definition: totempg.c:272
throw_away_mode
Definition: totempg.c:194
@ THROW_AWAY_INACTIVE
Definition: totempg.c:195
@ THROW_AWAY_ACTIVE
Definition: totempg.c:196
const char * totempg_ifaces_print(unsigned int nodeid)
Definition: totempg.c:1501
#define ONE_IFACE_LEN
Definition: totempg.c:1500
int totempg_groups_joined_reserve(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1302
QB_LIST_DECLARE(assembly_list_inuse)
int totempg_groups_initialize(void **totempg_groups_instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
Definition: totempg.c:1134
#define MAX_IOVECS_FROM_APP
Definition: totempg.c:1229
int totempg_crypto_set(const char *cipher_type, const char *hash_type)
Definition: totempg.c:1489
void totempg_get_config(struct totem_config *config)
Definition: totempg.c:1602
void * callback_token_received_handle
Definition: totempg.c:746
#define MESSAGE_QUEUE_MAX
Definition: totempg.c:1091
int totempg_iface_set(struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totempg.c:1434
int totempg_nodestatus_get(unsigned int nodeid, struct totem_node_status *node_status)
Definition: totempg.c:1450
void totempg_put_config(struct totem_config *config)
Definition: totempg.c:1612
int totempg_reconfigure(void)
Definition: totempg.c:1566
#define TOTEMPG_PACKET_SIZE
Definition: totempg.c:153
#define TOTEMPG_NEED_ALIGN
Definition: totempg.c:123
int totempg_crypto_reconfigure_phase(cfg_message_crypto_reconfig_phase_t phase)
Definition: totempg.c:1571
void totempg_trans_ack(void)
Definition: totempg.c:1591
void totempg_force_gather(void)
Definition: totempg.c:1596
int totempg_member_remove(const struct totem_ip_address *member, int ring_no)
Definition: totempg.c:1559
void totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)
Definition: totempg.c:1547
int totempg_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config)
Initialize the totem process groups abstraction.
Definition: totempg.c:802
int totempg_groups_joined_release(int msg_count)
Definition: totempg.c:1346
#define MAX_GROUPS_PER_MSG
Definition: totempg.c:1230
void totempg_service_ready_register(void(*totem_service_ready)(void))
Definition: totempg.c:1541
int totempg_ifaces_get(unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totempg.c:1457
void totempg_callback_token_destroy(void *handle_out)
Definition: totempg.c:1118
void totempg_event_signal(enum totem_event_type type, int value)
Definition: totempg.c:1479
void totempg_stats_clear(int flags)
Definition: totempg.c:1576
void totempg_finalize(void)
Definition: totempg.c:855
void * totempg_get_stats(void)
Definition: totempg.c:1484
int totempg_groups_mcast_joined(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
Definition: totempg.c:1232
void totempg_threaded_mode_enable(void)
Definition: totempg.c:1585
int totempg_groups_send_ok_groups(void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1404
int totempg_groups_mcast_groups(void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1360
int totempg_member_add(const struct totem_ip_address *member, int ring_no)
Definition: totempg.c:1552
int totempg_groups_join(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1182
Totem Single Ring Protocol.
void(* totem_queue_level_changed_fn)(enum totem_q_level level)
Definition: totempg.h:189
@ TOTEM_Q_LEVEL_GOOD
Definition: totempg.h:182
@ TOTEM_Q_LEVEL_HIGH
Definition: totempg.h:183
@ TOTEM_Q_LEVEL_LOW
Definition: totempg.h:181
@ TOTEM_Q_LEVEL_CRITICAL
Definition: totempg.h:184
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1134
void totemsrp_force_gather(void *context)
Definition: totemsrp.c:5247
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5168
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:819
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3496
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5203
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5164
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition: totemsrp.c:1042
int guarantee
Definition: totemsrp.c:6
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1109
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2569
void totemsrp_stats_clear(void *context, int flags)
Definition: totemsrp.c:5237
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemsrp.c:5086
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1027
struct memb_ring_id ring_id
Definition: totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5210
struct totem_message_header header
Definition: totemsrp.c:0
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition: totemsrp.c:5228
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1123
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2489
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3531
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5177
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1071
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition: totemsrp.c:5219
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5190
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2498
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TOTEM
Definition: totemstats.h:115