91#include <sys/socket.h>
92#include <netinet/in.h>
104#include <qb/qblist.h>
105#include <qb/qbloop.h>
106#include <qb/qbipcs.h>
108#define LOGSYS_UTILS_ONLY 1
119#if !(defined(__i386__) || defined(__x86_64__))
123#define TOTEMPG_NEED_ALIGN 1
153#define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
154 sizeof (struct totempg_mcast))
161static int mcast_packed_msg_count = 0;
163static int totempg_reserved = 1;
165static unsigned int totempg_size_limit;
169static uint32_t totempg_threaded_mode = 0;
171static void *totemsrp_context;
176static int totempg_log_level_security;
177static int totempg_log_level_error;
178static int totempg_log_level_warning;
179static int totempg_log_level_notice;
180static int totempg_log_level_debug;
181static int totempg_subsys_id;
182static void (*totempg_log_printf) (
234static unsigned char *fragmentation_data;
236static int fragment_size = 0;
238static int fragment_continuation = 0;
240static int totempg_waiting_transack = 0;
246 unsigned int msg_len,
251 const unsigned int *member_list,
size_t member_list_entries,
264static unsigned char next_fragment = 1;
272#define log_printf(level, format, args...) \
274 totempg_log_printf(level, \
276 __FUNCTION__, __FILE__, __LINE__, \
280static int msg_count_send_ok (
int msg_count);
282static int byte_count_send_ok (
int byte_count);
284static void totempg_waiting_trans_ack_cb (
int waiting_trans_ack)
287 totempg_waiting_transack = waiting_trans_ack;
296 if (totempg_waiting_transack) {
352static void assembly_deref_from_normal_and_trans (
int nodeid)
359 for (
j = 0;
j < 2;
j++) {
378static inline void app_confchg_fn (
380 const unsigned int *member_list,
size_t member_list_entries,
395 assembly_deref_from_normal_and_trans (
left_list[
i]);
415static inline void group_endian_convert (
419 unsigned short *group_len;
423#ifdef TOTEMPG_NEED_ALIGN
427 if ((
size_t)msg %
sizeof(
char *) != 0) {
438 group_len[0] =
swab16(group_len[0]);
439 for (
i = 1;
i < group_len[0] + 1;
i++) {
440 group_len[
i] =
swab16(group_len[
i]);
448static inline int group_matches (
455 unsigned short *group_len;
459#ifdef TOTEMPG_NEED_ALIGN
465#ifdef TOTEMPG_NEED_ALIGN
469 if ((
size_t)
iovec->iov_base %
sizeof(
char *) != 0) {
477 group_len = (
unsigned short *)
iovec->iov_base;
478 group_name = ((
char *)
iovec->iov_base) +
479 sizeof (
unsigned short) * (group_len[0] + 1);
486 for (
i = 1;
i < group_len[0] + 1;
i++) {
493 for (
i = 1;
i < group_len[0] + 1;
i++) {
495 if ((group_len[
i] ==
groups_b[
j].group_len) &&
500 group_name += group_len[
i];
506static inline void app_deliver_fn (
509 unsigned int msg_len,
521 group_endian_convert (msg, msg_len);
530#ifdef TOTEMPG_NEED_ALIGN
550#ifdef TOTEMPG_NEED_ALIGN
574static void totempg_confchg_fn (
576 const unsigned int *member_list,
size_t member_list_entries,
583 member_list, member_list_entries,
589static void totempg_deliver_fn (
592 unsigned int msg_len,
613 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID " is too short... Ignoring.",
nodeid);
634 "Message (totempg_mcast datasize) received from node " CS_PRI_NODE_ID
635 " is too short... Ignoring.",
nodeid);
645 for (
i = 0;
i <
mcast->msg_count;
i++) {
656 " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
691 if (
mcast->fragmented == 0 ||
mcast->fragmented == 1) {
713 log_printf (
LOG_DEBUG,
"fragmented continuation %u is not equal to assembly last_frag_num %u",
719 if (
mcast->fragmented == 0) {
730 if (
mcast->msg_count > 1) {
754 if (totempg_threaded_mode == 1) {
757 if (mcast_packed_msg_count == 0) {
758 if (totempg_threaded_mode == 1) {
764 if (totempg_threaded_mode == 1) {
771 mcast.fragmented = 0;
777 mcast.continuation = fragment_continuation;
778 fragment_continuation = 0;
780 mcast.msg_count = mcast_packed_msg_count;
784 iovecs[1].iov_base = (
void *)mcast_packed_msg_lens;
785 iovecs[1].iov_len = mcast_packed_msg_count *
sizeof (
unsigned short);
786 iovecs[2].iov_base = (
void *)&fragmentation_data[0];
787 iovecs[2].iov_len = fragment_size;
790 mcast_packed_msg_count = 0;
793 if (totempg_threaded_mode == 1) {
818 if (fragmentation_data == 0) {
831 totempg_waiting_trans_ack_cb);
842 callback_token_received_fn,
857 if (totempg_threaded_mode == 1) {
861 if (totempg_threaded_mode == 1) {
869static int mcast_msg (
885 if (totempg_threaded_mode == 1) {
897 sizeof (
struct iovec));
903 (
sizeof (
unsigned short) * (mcast_packed_msg_count + 1));
905 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
914 if (byte_count_send_ok (
total_size +
sizeof(
unsigned short) *
915 (mcast_packed_msg_count)) == 0) {
917 if (totempg_threaded_mode == 1) {
927 mcast.fragmented = 0;
928 mcast.continuation = fragment_continuation;
940 memcpy (&fragmentation_data[fragment_size],
943 mcast_packed_msg_lens[mcast_packed_msg_count] +=
copy_len;
963 memcpy (&fragmentation_data[fragment_size],
965 mcast_packed_msg_lens[mcast_packed_msg_count] +=
copy_len;
974 if (!next_fragment) {
977 fragment_continuation = next_fragment;
978 mcast.fragmented = next_fragment++;
979 assert(fragment_continuation != 0);
982 fragment_continuation = 0;
988 mcast.msg_count = ++mcast_packed_msg_count;
991 iovecs[1].iov_base = (
void *)mcast_packed_msg_lens;
992 iovecs[1].iov_len = mcast_packed_msg_count *
993 sizeof(
unsigned short);
1005 mcast_packed_msg_lens[0] = 0;
1006 mcast_packed_msg_count = 0;
1032 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1033 mcast_packed_msg_count++;
1037 if (totempg_threaded_mode == 1) {
1046static int msg_count_send_ok (
1057static int byte_count_send_ok (
1070static int send_reserve (
1082static void send_release (
1089#ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1090#undef MESSAGE_QUEUE_MAX
1091#define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1094static uint32_t q_level_precent_used(
void)
1107 if (totempg_threaded_mode == 1) {
1112 if (totempg_threaded_mode == 1) {
1121 if (totempg_threaded_mode == 1) {
1125 if (totempg_threaded_mode == 1) {
1137 void (*deliver_fn) (
1140 unsigned int msg_len,
1143 void (*confchg_fn) (
1145 const unsigned int *member_list,
size_t member_list_entries,
1152 if (totempg_threaded_mode == 1) {
1157 if (instance ==
NULL) {
1169 if (totempg_threaded_mode == 1) {
1176 if (totempg_threaded_mode == 1) {
1191 if (totempg_threaded_mode == 1) {
1208 if (totempg_threaded_mode == 1) {
1219 if (totempg_threaded_mode == 1) {
1223 if (totempg_threaded_mode == 1) {
1229#define MAX_IOVECS_FROM_APP 32
1230#define MAX_GROUPS_PER_MSG 32
1244 if (totempg_threaded_mode == 1) {
1266 if (totempg_threaded_mode == 1) {
1273static void check_q_level(
1290 totem_queue_level_changed(instance->
q_level);
1299 check_q_level(instance);
1308 unsigned int size = 0;
1312 if (totempg_threaded_mode == 1) {
1321 size +=
iovec[
i].iov_len;
1324 if (size >= totempg_size_limit) {
1329 if (byte_count_send_ok (size)) {
1336 check_q_level(instance);
1338 if (totempg_threaded_mode == 1) {
1348 if (totempg_threaded_mode == 1) {
1352 send_release (msg_count);
1353 if (totempg_threaded_mode == 1) {
1373 if (totempg_threaded_mode == 1) {
1380 group_len[0] = groups_cnt;
1381 for (
i = 0;
i < groups_cnt;
i++) {
1386 iovec_mcast[0].iov_len = (groups_cnt + 1) *
sizeof (
unsigned short);
1395 if (totempg_threaded_mode == 1) {
1411 unsigned int size = 0;
1415 if (totempg_threaded_mode == 1) {
1419 for (
i = 0;
i < groups_cnt;
i++) {
1423 size +=
iovec[
i].iov_len;
1426 res = msg_count_send_ok (size);
1428 if (totempg_threaded_mode == 1) {
1436 unsigned short ip_port,
1486 return &totempg_stats;
1500#define ONE_IFACE_LEN 63
1515 return (
"no interface found for nodeid");
1549 totem_queue_level_changed =
fn;
1587 totempg_threaded_mode = 1;
totem_configuration_type
The totem_configuration_type enum.
totem_callback_token_type
The totem_callback_token_type enum.
@ TOTEM_CALLBACK_TOKEN_RECEIVED
unsigned char last_frag_num
enum throw_away_mode throw_away_mode
unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE]
struct totem_message_header header
struct totem_logging_configuration totem_logging_configuration
struct totem_interface * interfaces
The totem_ip_address struct.
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
void(*) in log_level_security)
void(* confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
struct totempg_group * groups
void(* deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
struct totempg_mcast_header header
unsigned char continuation
#define swab16(x)
The swab16 macro.
cfg_message_crypto_reconfig_phase_t
const char * totemip_print(const struct totem_ip_address *addr)
int totempg_my_family_get(void)
unsigned int totempg_my_nodeid_get(void)
static void(*) struct totem_config totempg_totem_config)
int totempg_groups_leave(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
void totempg_check_q_level(void *totempg_groups_instance)
int totempg_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
#define log_printf(level, format, args...)
const char * totempg_ifaces_print(unsigned int nodeid)
int totempg_groups_joined_reserve(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len)
QB_LIST_DECLARE(assembly_list_inuse)
int totempg_groups_initialize(void **totempg_groups_instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
#define MAX_IOVECS_FROM_APP
int totempg_crypto_set(const char *cipher_type, const char *hash_type)
void totempg_get_config(struct totem_config *config)
void * callback_token_received_handle
#define MESSAGE_QUEUE_MAX
int totempg_iface_set(struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
int totempg_nodestatus_get(unsigned int nodeid, struct totem_node_status *node_status)
void totempg_put_config(struct totem_config *config)
int totempg_reconfigure(void)
#define TOTEMPG_PACKET_SIZE
int totempg_crypto_reconfigure_phase(cfg_message_crypto_reconfig_phase_t phase)
void totempg_trans_ack(void)
void totempg_force_gather(void)
int totempg_member_remove(const struct totem_ip_address *member, int ring_no)
void totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)
int totempg_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config)
Initialize the totem process groups abstraction.
int totempg_groups_joined_release(int msg_count)
#define MAX_GROUPS_PER_MSG
void totempg_service_ready_register(void(*totem_service_ready)(void))
int totempg_ifaces_get(unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
void totempg_callback_token_destroy(void *handle_out)
void totempg_event_signal(enum totem_event_type type, int value)
void totempg_stats_clear(int flags)
void totempg_finalize(void)
void * totempg_get_stats(void)
int totempg_groups_mcast_joined(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
void totempg_threaded_mode_enable(void)
int totempg_groups_send_ok_groups(void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
int totempg_groups_mcast_groups(void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
int totempg_member_add(const struct totem_ip_address *member, int ring_no)
int totempg_groups_join(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Totem Single Ring Protocol.
void(* totem_queue_level_changed_fn)(enum totem_q_level level)
int totemsrp_my_family_get(void *srp_context)
void totemsrp_force_gather(void *context)
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
void totemsrp_threaded_mode_enable(void *context)
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
void totemsrp_stats_clear(void *context, int flags)
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
void totemsrp_finalize(void *srp_context)
struct memb_ring_id ring_id
void totemsrp_trans_ack(void *context)
struct totem_message_header header
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
unsigned int totemsrp_my_nodeid_get(void *srp_context)
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TOTEM