37#include <sys/socket.h>
40#include <netinet/in.h>
55#include <qb/qbipc_common.h>
63#define MESSAGE_REQ_SYNC_BARRIER 0
64#define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
80 const unsigned int *trans_list,
81 size_t trans_list_entries,
82 const unsigned int *member_list,
83 size_t member_list_entries,
113static int my_processing_idx = 0;
123static size_t my_member_list_entries = 0;
125static size_t my_trans_list_entries = 0;
127static int my_processor_list_entries = 0;
131static int my_service_list_entries = 0;
133static void (*sync_synchronization_completed) (void);
135static void sync_deliver_fn (
138 unsigned int msg_len,
139 int endian_conversion_required);
141static int schedwrk_processor (
const void *context);
143static void sync_process_enter (
void);
145static void sync_process_call_init (
void);
152static void *sync_group_handle;
159 int (*sync_callbacks_retrieve) (
162 void (*synchronization_completed) (
void))
172 "Couldn't initialize groups interface.");
185 sync_synchronization_completed = synchronization_completed;
191static void sync_barrier_handler (
unsigned int nodeid,
const void *msg)
195 int barrier_reached = 1;
203 for (i = 0; i < my_processor_list_entries; i++) {
208 for (i = 0; i < my_processor_list_entries; i++) {
209 if (my_processor_list[i].received == 0) {
213 if (barrier_reached) {
215 my_service_list[my_processing_idx].name);
222 my_processing_idx += 1;
223 if (my_service_list_entries == my_processing_idx) {
224 sync_synchronization_completed ();
226 sync_process_enter ();
231static void dummy_sync_abort (
void)
235static int dummy_sync_process (
void)
240static void dummy_sync_activate (
void)
244static int service_entry_compare (
const void *a,
const void *b)
252static void sync_service_build_handler (
unsigned int nodeid,
const void *msg)
256 int barrier_reached = 1;
258 int qsort_trigger = 0;
268 for (j = 0; j < my_service_list_entries; j++) {
270 my_service_list[j].service_id) {
276 my_service_list[my_service_list_entries].
state =
PROCESS;
277 my_service_list[my_service_list_entries].
service_id =
279 sprintf (my_service_list[my_service_list_entries].name,
280 "Unknown External Service (id = %d)\n",
282 my_service_list[my_service_list_entries].
sync_init =
284 my_service_list[my_service_list_entries].
sync_abort =
290 my_service_list_entries += 1;
296 qsort (my_service_list, my_service_list_entries,
299 for (i = 0; i < my_processor_list_entries; i++) {
304 for (i = 0; i < my_processor_list_entries; i++) {
305 if (my_processor_list[i].received == 0) {
309 if (barrier_reached) {
311 sync_process_enter ();
315static void sync_deliver_fn (
318 unsigned int msg_len,
319 int endian_conversion_required)
321 struct qb_ipc_request_header *
header = (
struct qb_ipc_request_header *)msg;
325 sync_barrier_handler (
nodeid, msg);
328 sync_service_build_handler (
nodeid, msg);
333static void barrier_message_transmit (
void)
360 memcpy (&service_build_message->ring_id, &my_ring_id,
363 iovec.iov_base = (
void *)service_build_message;
370static void sync_barrier_enter (
void)
373 barrier_message_transmit ();
376static void sync_process_call_init (
void)
379 size_t old_trans_list_entries = 0;
383 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
384 sizeof (
unsigned int));
385 old_trans_list_entries = my_trans_list_entries;
387 my_trans_list_entries = 0;
388 for (o = 0; o < old_trans_list_entries; o++) {
389 for (m = 0; m < my_member_list_entries; m++) {
390 if (old_trans_list[o] == my_member_list[m]) {
391 my_trans_list[my_trans_list_entries] = my_member_list[m];
392 my_trans_list_entries++;
398 for (i = 0; i < my_service_list_entries; i++) {
400 my_service_list[i].
sync_init (my_trans_list,
401 my_trans_list_entries, my_member_list,
402 my_member_list_entries,
408static void sync_process_enter (
void)
417 if (my_service_list_entries == 0) {
419 sync_synchronization_completed ();
422 for (i = 0; i < my_processor_list_entries; i++) {
431static void sync_servicelist_build_enter (
432 const unsigned int *member_list,
433 size_t member_list_entries,
441 memset(&service_build, 0,
sizeof(service_build));
444 for (i = 0; i < member_list_entries; i++) {
445 my_processor_list[i].
nodeid = member_list[i];
448 my_processor_list_entries = member_list_entries;
450 memcpy (my_member_list, member_list,
451 member_list_entries *
sizeof (
unsigned int));
452 my_member_list_entries = member_list_entries;
454 my_processing_idx = 0;
457 my_service_list_entries = 0;
467 my_service_list[my_service_list_entries].
state =
PROCESS;
468 my_service_list[my_service_list_entries].
service_id = i;
470 assert(strlen(
sync_callbacks.
name) <
sizeof(my_service_list[my_service_list_entries].name));
472 strcpy (my_service_list[my_service_list_entries].
name,
478 my_service_list_entries += 1;
481 for (i = 0; i < my_service_list_entries; i++) {
482 service_build.service_list[i] =
485 service_build.service_list_entries = my_service_list_entries;
487 service_build_message_transmit (&service_build);
490 sync_process_call_init ();
493static int schedwrk_processor (
const void *context)
497 if (my_service_list[my_processing_idx].state ==
PROCESS) {
499 res = my_service_list[my_processing_idx].
sync_process ();
504 sync_barrier_enter();
513 const unsigned int *member_list,
514 size_t member_list_entries,
520 sync_servicelist_build_enter (member_list, member_list_entries,
525 const unsigned int *member_list,
526 size_t member_list_entries,
530 memcpy (my_trans_list, member_list, member_list_entries *
531 sizeof (
unsigned int));
532 my_trans_list_entries = member_list_entries;
541 my_service_list[my_processing_idx].
sync_abort ();
#define SERVICES_COUNT_MAX
#define PROCESSOR_COUNT_MAX
#define LOGSYS_LEVEL_ERROR
#define log_printf(level, format, args...)
#define LOGSYS_LEVEL_DEBUG
int schedwrk_create(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
void schedwrk_destroy(hdb_handle_t handle)
struct qb_ipc_request_header header __attribute__((aligned(8)))
int service_list_entries __attribute__((aligned(8)))
int service_list[128] __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
enum sync_process_state state
void(* sync_activate)(void)
int(* sync_process)(void)
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
int(* sync_process)(void)
void(* sync_activate)(void)
int sync_init(int(*sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks), void(*synchronization_completed)(void))
#define MESSAGE_REQ_SYNC_BARRIER
#define MESSAGE_REQ_SYNC_SERVICE_BUILD
void sync_save_transitional(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
int(* my_sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks)
void sync_start(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
LOGSYS_DECLARE_SUBSYS("SYNC")
Totem Single Ring Protocol.
int totempg_groups_mcast_joined(void *instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
int totempg_groups_join(void *instance, const struct totempg_group *groups, size_t group_cnt)
int totempg_groups_initialize(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
struct memb_ring_id ring_id
struct totem_message_header header