corosync 3.1.7
sync.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2009-2012 Red Hat, Inc.
3 *
4 * All rights reserved.
5 *
6 * Author: Steven Dake (sdake@redhat.com)
7 *
8 * This software licensed under BSD license, the text of which follows:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
12 *
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the MontaVista Software, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
33 */
34#include <config.h>
35
36#include <sys/types.h>
37#include <sys/socket.h>
38#include <sys/un.h>
39#include <sys/ioctl.h>
40#include <netinet/in.h>
41#include <sys/uio.h>
42#include <unistd.h>
43#include <fcntl.h>
44#include <stdlib.h>
45#include <stdio.h>
46#include <errno.h>
47#include <time.h>
48#include <arpa/inet.h>
49
50#include <corosync/corotypes.h>
51#include <corosync/swab.h>
54#include <corosync/logsys.h>
55#include <qb/qbipc_common.h>
56#include "schedwrk.h"
57#include "quorum.h"
58#include "sync.h"
59#include "main.h"
60
62
63#define MESSAGE_REQ_SYNC_BARRIER 0
64#define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
65
69};
70
75};
76
79 void (*sync_init) (
80 const unsigned int *trans_list,
81 size_t trans_list_entries,
82 const unsigned int *member_list,
83 size_t member_list_entries,
84 const struct memb_ring_id *ring_id);
85 void (*sync_abort) (void);
86 int (*sync_process) (void);
87 void (*sync_activate) (void);
89 char name[128];
90};
91
93 int nodeid;
95};
96
98 struct qb_ipc_request_header header __attribute__((aligned(8)));
99 struct memb_ring_id ring_id __attribute__((aligned(8)));
100 int service_list_entries __attribute__((aligned(8)));
101 int service_list[128] __attribute__((aligned(8)));
102};
103
105 struct qb_ipc_request_header header __attribute__((aligned(8)));
106 struct memb_ring_id ring_id __attribute__((aligned(8)));
107};
108
109static enum sync_state my_state = SYNC_BARRIER;
110
111static struct memb_ring_id my_ring_id;
112
113static int my_processing_idx = 0;
114
115static hdb_handle_t my_schedwrk_handle;
116
117static struct processor_entry my_processor_list[PROCESSOR_COUNT_MAX];
118
119static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
120
121static unsigned int my_trans_list[PROCESSOR_COUNT_MAX];
122
123static size_t my_member_list_entries = 0;
124
125static size_t my_trans_list_entries = 0;
126
127static int my_processor_list_entries = 0;
128
129static struct service_entry my_service_list[SERVICES_COUNT_MAX];
130
131static int my_service_list_entries = 0;
132
133static void (*sync_synchronization_completed) (void);
134
135static void sync_deliver_fn (
136 unsigned int nodeid,
137 const void *msg,
138 unsigned int msg_len,
139 int endian_conversion_required);
140
141static int schedwrk_processor (const void *context);
142
143static void sync_process_enter (void);
144
145static void sync_process_call_init (void);
146
147static struct totempg_group sync_group = {
148 .group = "sync",
149 .group_len = 4
150};
151
152static void *sync_group_handle;
153
155 int service_id,
156 struct sync_callbacks *callbacks);
157
159 int (*sync_callbacks_retrieve) (
160 int service_id,
161 struct sync_callbacks *callbacks),
162 void (*synchronization_completed) (void))
163{
164 unsigned int res;
165
167 &sync_group_handle,
168 sync_deliver_fn,
169 NULL);
170 if (res == -1) {
172 "Couldn't initialize groups interface.");
173 return (-1);
174 }
175
176 res = totempg_groups_join (
177 sync_group_handle,
178 &sync_group,
179 1);
180 if (res == -1) {
181 log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group.");
182 return (-1);
183 }
184
185 sync_synchronization_completed = synchronization_completed;
186 my_sync_callbacks_retrieve = sync_callbacks_retrieve;
187
188 return (0);
189}
190
191static void sync_barrier_handler (unsigned int nodeid, const void *msg)
192{
194 int i;
195 int barrier_reached = 1;
196
197 if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
198 sizeof (struct memb_ring_id)) != 0) {
199
200 log_printf (LOGSYS_LEVEL_DEBUG, "barrier for old ring - discarding");
201 return;
202 }
203 for (i = 0; i < my_processor_list_entries; i++) {
204 if (my_processor_list[i].nodeid == nodeid) {
205 my_processor_list[i].received = 1;
206 }
207 }
208 for (i = 0; i < my_processor_list_entries; i++) {
209 if (my_processor_list[i].received == 0) {
210 barrier_reached = 0;
211 }
212 }
213 if (barrier_reached) {
214 log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s",
215 my_service_list[my_processing_idx].name);
216 my_service_list[my_processing_idx].state = ACTIVATE;
217
218 if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
219 my_service_list[my_processing_idx].sync_activate ();
220 }
221
222 my_processing_idx += 1;
223 if (my_service_list_entries == my_processing_idx) {
224 sync_synchronization_completed ();
225 } else {
226 sync_process_enter ();
227 }
228 }
229}
230
231static void dummy_sync_abort (void)
232{
233}
234
235static int dummy_sync_process (void)
236{
237 return (0);
238}
239
240static void dummy_sync_activate (void)
241{
242}
243
244static int service_entry_compare (const void *a, const void *b)
245{
246 const struct service_entry *service_entry_a = a;
247 const struct service_entry *service_entry_b = b;
248
249 return (service_entry_a->service_id > service_entry_b->service_id);
250}
251
252static void sync_service_build_handler (unsigned int nodeid, const void *msg)
253{
255 int i, j;
256 int barrier_reached = 1;
257 int found;
258 int qsort_trigger = 0;
259
260 if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
261 sizeof (struct memb_ring_id)) != 0) {
262 log_printf (LOGSYS_LEVEL_DEBUG, "service build for old ring - discarding");
263 return;
264 }
265 for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
266
267 found = 0;
268 for (j = 0; j < my_service_list_entries; j++) {
269 if (req_exec_service_build_message->service_list[i] ==
270 my_service_list[j].service_id) {
271 found = 1;
272 break;
273 }
274 }
275 if (found == 0) {
276 my_service_list[my_service_list_entries].state = PROCESS;
277 my_service_list[my_service_list_entries].service_id =
278 req_exec_service_build_message->service_list[i];
279 sprintf (my_service_list[my_service_list_entries].name,
280 "Unknown External Service (id = %d)\n",
281 req_exec_service_build_message->service_list[i]);
282 my_service_list[my_service_list_entries].sync_init =
283 NULL;
284 my_service_list[my_service_list_entries].sync_abort =
285 dummy_sync_abort;
286 my_service_list[my_service_list_entries].sync_process =
287 dummy_sync_process;
288 my_service_list[my_service_list_entries].sync_activate =
289 dummy_sync_activate;
290 my_service_list_entries += 1;
291
292 qsort_trigger = 1;
293 }
294 }
295 if (qsort_trigger) {
296 qsort (my_service_list, my_service_list_entries,
297 sizeof (struct service_entry), service_entry_compare);
298 }
299 for (i = 0; i < my_processor_list_entries; i++) {
300 if (my_processor_list[i].nodeid == nodeid) {
301 my_processor_list[i].received = 1;
302 }
303 }
304 for (i = 0; i < my_processor_list_entries; i++) {
305 if (my_processor_list[i].received == 0) {
306 barrier_reached = 0;
307 }
308 }
309 if (barrier_reached) {
310 log_printf (LOGSYS_LEVEL_DEBUG, "enter sync process");
311 sync_process_enter ();
312 }
313}
314
315static void sync_deliver_fn (
316 unsigned int nodeid,
317 const void *msg,
318 unsigned int msg_len,
319 int endian_conversion_required)
320{
321 struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg;
322
323 switch (header->id) {
325 sync_barrier_handler (nodeid, msg);
326 break;
328 sync_service_build_handler (nodeid, msg);
329 break;
330 }
331}
332
333static void barrier_message_transmit (void)
334{
335 struct iovec iovec;
337
339
340 req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message);
342
343 memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
344 sizeof (struct memb_ring_id));
345
346 iovec.iov_base = (char *)&req_exec_barrier_message;
347 iovec.iov_len = sizeof (req_exec_barrier_message);
348
349 (void)totempg_groups_mcast_joined (sync_group_handle,
350 &iovec, 1, TOTEMPG_AGREED);
351}
352
353static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message)
354{
355 struct iovec iovec;
356
357 service_build_message->header.size = sizeof (struct req_exec_service_build_message);
358 service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD;
359
360 memcpy (&service_build_message->ring_id, &my_ring_id,
361 sizeof (struct memb_ring_id));
362
363 iovec.iov_base = (void *)service_build_message;
364 iovec.iov_len = sizeof (struct req_exec_service_build_message);
365
366 (void)totempg_groups_mcast_joined (sync_group_handle,
367 &iovec, 1, TOTEMPG_AGREED);
368}
369
370static void sync_barrier_enter (void)
371{
372 my_state = SYNC_BARRIER;
373 barrier_message_transmit ();
374}
375
376static void sync_process_call_init (void)
377{
378 unsigned int old_trans_list[PROCESSOR_COUNT_MAX];
379 size_t old_trans_list_entries = 0;
380 int o, m;
381 int i;
382
383 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
384 sizeof (unsigned int));
385 old_trans_list_entries = my_trans_list_entries;
386
387 my_trans_list_entries = 0;
388 for (o = 0; o < old_trans_list_entries; o++) {
389 for (m = 0; m < my_member_list_entries; m++) {
390 if (old_trans_list[o] == my_member_list[m]) {
391 my_trans_list[my_trans_list_entries] = my_member_list[m];
392 my_trans_list_entries++;
393 break;
394 }
395 }
396 }
397
398 for (i = 0; i < my_service_list_entries; i++) {
399 if (my_sync_callbacks_retrieve(my_service_list[i].service_id, NULL) != -1) {
400 my_service_list[i].sync_init (my_trans_list,
401 my_trans_list_entries, my_member_list,
402 my_member_list_entries,
403 &my_ring_id);
404 }
405 }
406}
407
408static void sync_process_enter (void)
409{
410 int i;
411
412 my_state = SYNC_PROCESS;
413
414 /*
415 * No sync services
416 */
417 if (my_service_list_entries == 0) {
418 my_state = SYNC_SERVICELIST_BUILD;
419 sync_synchronization_completed ();
420 return;
421 }
422 for (i = 0; i < my_processor_list_entries; i++) {
423 my_processor_list[i].received = 0;
424 }
425
426 schedwrk_create (&my_schedwrk_handle,
427 schedwrk_processor,
428 NULL);
429}
430
431static void sync_servicelist_build_enter (
432 const unsigned int *member_list,
433 size_t member_list_entries,
434 const struct memb_ring_id *ring_id)
435{
436 struct req_exec_service_build_message service_build;
437 int i;
438 int res;
440
441 memset(&service_build, 0, sizeof(service_build));
442
443 my_state = SYNC_SERVICELIST_BUILD;
444 for (i = 0; i < member_list_entries; i++) {
445 my_processor_list[i].nodeid = member_list[i];
446 my_processor_list[i].received = 0;
447 }
448 my_processor_list_entries = member_list_entries;
449
450 memcpy (my_member_list, member_list,
451 member_list_entries * sizeof (unsigned int));
452 my_member_list_entries = member_list_entries;
453
454 my_processing_idx = 0;
455
456 memset(my_service_list, 0, sizeof (struct service_entry) * SERVICES_COUNT_MAX);
457 my_service_list_entries = 0;
458
459 for (i = 0; i < SERVICES_COUNT_MAX; i++) {
461 if (res == -1) {
462 continue;
463 }
464 if (sync_callbacks.sync_init == NULL) {
465 continue;
466 }
467 my_service_list[my_service_list_entries].state = PROCESS;
468 my_service_list[my_service_list_entries].service_id = i;
469
470 assert(strlen(sync_callbacks.name) < sizeof(my_service_list[my_service_list_entries].name));
471
472 strcpy (my_service_list[my_service_list_entries].name,
474 my_service_list[my_service_list_entries].sync_init = sync_callbacks.sync_init;
475 my_service_list[my_service_list_entries].sync_process = sync_callbacks.sync_process;
476 my_service_list[my_service_list_entries].sync_abort = sync_callbacks.sync_abort;
477 my_service_list[my_service_list_entries].sync_activate = sync_callbacks.sync_activate;
478 my_service_list_entries += 1;
479 }
480
481 for (i = 0; i < my_service_list_entries; i++) {
482 service_build.service_list[i] =
483 my_service_list[i].service_id;
484 }
485 service_build.service_list_entries = my_service_list_entries;
486
487 service_build_message_transmit (&service_build);
488
489 log_printf (LOGSYS_LEVEL_DEBUG, "call init for locally known services");
490 sync_process_call_init ();
491}
492
493static int schedwrk_processor (const void *context)
494{
495 int res = 0;
496
497 if (my_service_list[my_processing_idx].state == PROCESS) {
498 if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
499 res = my_service_list[my_processing_idx].sync_process ();
500 } else {
501 res = 0;
502 }
503 if (res == 0) {
504 sync_barrier_enter();
505 } else {
506 return (-1);
507 }
508 }
509 return (0);
510}
511
513 const unsigned int *member_list,
514 size_t member_list_entries,
515 const struct memb_ring_id *ring_id)
516{
517 ENTER();
518 memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id));
519
520 sync_servicelist_build_enter (member_list, member_list_entries,
521 ring_id);
522}
523
525 const unsigned int *member_list,
526 size_t member_list_entries,
527 const struct memb_ring_id *ring_id)
528{
529 ENTER();
530 memcpy (my_trans_list, member_list, member_list_entries *
531 sizeof (unsigned int));
532 my_trans_list_entries = member_list_entries;
533}
534
535void sync_abort (void)
536{
537 ENTER();
538 if (my_state == SYNC_PROCESS) {
539 schedwrk_destroy (my_schedwrk_handle);
540 if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
541 my_service_list[my_processing_idx].sync_abort ();
542 }
543 }
544
545 /* this will cause any "old" barrier messages from causing
546 * problems.
547 */
548 memset (&my_ring_id, 0, sizeof (struct memb_ring_id));
549}
unsigned int nodeid
Definition: coroapi.h:0
#define SERVICES_COUNT_MAX
Definition: coroapi.h:462
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
qb_handle_t hdb_handle_t
Definition: hdb.h:52
#define LOGSYS_LEVEL_ERROR
Definition: logsys.h:72
#define log_printf(level, format, args...)
Definition: logsys.h:332
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
#define ENTER
Definition: logsys.h:333
int schedwrk_create(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
Definition: schedwrk.c:138
void schedwrk_destroy(hdb_handle_t handle)
Definition: schedwrk.c:154
The memb_ring_id struct.
Definition: coroapi.h:122
Definition: sync.c:92
int nodeid
Definition: sync.c:93
int received
Definition: sync.c:94
struct qb_ipc_request_header header __attribute__((aligned(8)))
int service_list_entries __attribute__((aligned(8)))
int service_list[128] __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
Definition: sync.c:77
enum sync_process_state state
Definition: sync.c:88
int service_id
Definition: sync.c:78
char name[128]
Definition: sync.c:89
void(* sync_abort)(void)
Definition: sync.c:85
void(* sync_activate)(void)
Definition: sync.c:87
int(* sync_process)(void)
Definition: sync.c:86
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Definition: sync.c:79
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Definition: sync.h:39
const char * name
Definition: sync.h:48
int(* sync_process)(void)
Definition: sync.h:45
void(* sync_activate)(void)
Definition: sync.h:46
void(* sync_abort)(void)
Definition: sync.h:47
const void * group
Definition: totempg.h:56
int sync_init(int(*sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks), void(*synchronization_completed)(void))
Definition: sync.c:158
void sync_abort(void)
Definition: sync.c:535
#define MESSAGE_REQ_SYNC_BARRIER
Definition: sync.c:63
sync_process_state
Definition: sync.c:66
@ ACTIVATE
Definition: sync.c:68
@ PROCESS
Definition: sync.c:67
sync_state
Definition: sync.c:71
@ SYNC_SERVICELIST_BUILD
Definition: sync.c:72
@ SYNC_PROCESS
Definition: sync.c:73
@ SYNC_BARRIER
Definition: sync.c:74
#define MESSAGE_REQ_SYNC_SERVICE_BUILD
Definition: sync.c:64
void sync_save_transitional(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Definition: sync.c:524
int(* my_sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks)
Definition: sync.c:154
void sync_start(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Definition: sync.c:512
LOGSYS_DECLARE_SUBSYS("SYNC")
Totem Single Ring Protocol.
int totempg_groups_mcast_joined(void *instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
Definition: totempg.c:1232
int totempg_groups_join(void *instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1182
int totempg_groups_initialize(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
Definition: totempg.c:1134
#define TOTEMPG_AGREED
Definition: totempg.h:60
struct memb_ring_id ring_id
Definition: totemsrp.c:4
struct totem_message_header header
Definition: totemsrp.c:0