corosync 3.1.7
vsf_ykd.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2005 MontaVista Software, Inc.
3 * Copyright (c) 2006-2012 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include <config.h>
37
38#include <assert.h>
39#include <pwd.h>
40#include <grp.h>
41#include <sys/types.h>
42#include <sys/poll.h>
43#include <sys/uio.h>
44#include <sys/mman.h>
45#include <sys/socket.h>
46#include <sys/un.h>
47#include <sys/time.h>
48#include <sys/resource.h>
49#include <netinet/in.h>
50#include <arpa/inet.h>
51#include <unistd.h>
52#include <fcntl.h>
53#include <stdlib.h>
54#include <stdio.h>
55#include <errno.h>
56#include <sched.h>
57#include <time.h>
58
59#include "quorum.h"
60#include <corosync/logsys.h>
61#include <corosync/corotypes.h>
62#include <qb/qbipc_common.h>
63#include <corosync/mar_gen.h>
64#include <corosync/coroapi.h>
65#include <corosync/swab.h>
66
67#include "vsf_ykd.h"
68
70
71#define YKD_PROCESSOR_COUNT_MAX 32
72
76};
77
81};
82
83struct ykd_header {
84 int id;
85};
86
91};
92
93struct ykd_state {
95
97
99
101
103
105};
106
108 unsigned int nodeid;
111};
112
114
115static void *ykd_group_handle;
116
117static struct state_received state_received_confchg[YKD_PROCESSOR_COUNT_MAX];
118
119static int state_received_confchg_entries;
120
121static struct state_received state_received_process[YKD_PROCESSOR_COUNT_MAX];
122
123static int state_received_process_entries;
124
125static enum ykd_mode ykd_mode;
126
127static unsigned int ykd_view_list[YKD_PROCESSOR_COUNT_MAX];
128
129static int ykd_view_list_entries;
130
131static int session_id_max;
132
133static struct ykd_session *last_primary_max;
134
135static struct ykd_session ambiguous_sessions_max[YKD_PROCESSOR_COUNT_MAX];
136
137static int ambiguous_sessions_max_entries;
138
139static int ykd_primary_designated = 0;
140
141static struct memb_ring_id ykd_ring_id;
142
144
146
147static struct corosync_api_v1 *api;
148
149static void (*ykd_primary_callback_fn) (
150 const unsigned int *view_list,
151 size_t view_list_entries,
152 int primary_designated,
153 struct memb_ring_id *ring_id) = NULL;
154
155static void ykd_state_init (void)
156{
162}
163
164static int ykd_state_send_msg (const void *context)
165{
166 struct iovec iovec[2];
167 struct ykd_header header;
168 int res;
169
171
172 iovec[0].iov_base = (char *)&header;
173 iovec[0].iov_len = sizeof (struct ykd_header);
174 iovec[1].iov_base = (char *)&ykd_state;
175 iovec[1].iov_len = sizeof (struct ykd_state);
176
177 res = api->tpg_joined_mcast (ykd_group_handle, iovec, 2,
179
180 return (res);
181}
182
183static void ykd_state_send (void)
184{
185 api->schedwrk_create (
187 ykd_state_send_msg,
188 NULL);
189}
190
191static int ykd_attempt_send_msg (const void *context)
192{
193 struct iovec iovec;
194 struct ykd_header header;
195 int res;
196
198
199 iovec.iov_base = (char *)&header;
200 iovec.iov_len = sizeof (struct ykd_header);
201
202 res = api->tpg_joined_mcast (ykd_group_handle, &iovec, 1,
204
205 return (res);
206}
207
208static void ykd_attempt_send (void)
209{
210 api->schedwrk_create (
212 ykd_attempt_send_msg,
213 NULL);
214}
215
216static void compute (void)
217{
218 int i;
219 int j;
220
221 session_id_max = 0;
222 last_primary_max = &state_received_process[0].ykd_state.last_primary;
223 ambiguous_sessions_max_entries = 0;
224
225 for (i = 0; i < state_received_process_entries; i++) {
226 /*
227 * Calculate maximum session id
228 */
229 if (state_received_process[i].ykd_state.session_id > session_id_max) {
230 session_id_max = state_received_process[i].ykd_state.session_id;
231 }
232
233 /*
234 * Calculate maximum primary id
235 */
236 if (state_received_process[i].ykd_state.last_primary.session_id > last_primary_max->session_id) {
237 last_primary_max = &state_received_process[i].ykd_state.last_primary;
238 }
239
240 /*
241 * generate the maximum ambiguous sessions list
242 */
243 for (j = 0; j < state_received_process[i].ykd_state.ambiguous_sessions_entries; j++) {
244 if (state_received_process[i].ykd_state.ambiguous_sessions[j].session_id > last_primary_max->session_id) {
245 memcpy (&ambiguous_sessions_max[ambiguous_sessions_max_entries],
246 &state_received_process[i].ykd_state.ambiguous_sessions[j],
247 sizeof (struct ykd_session));
248 ambiguous_sessions_max_entries += 1;
249 }
250 }
251 }
252}
253
254static int subquorum (
255 unsigned int *member_list,
256 int member_list_entries,
257 struct ykd_session *session)
258{
259 int intersections = 0;
260 int i;
261 int j;
262
263 for (i = 0; i < member_list_entries; i++) {
264 for (j = 0; j < session->member_list_entries; j++) {
265 if (member_list[i] == session->member_list[j]) {
266 intersections += 1;
267 }
268 }
269 }
270
271 /*
272 * even split
273 */
274 if (intersections == (session->member_list_entries - intersections)) {
275 return (1);
276 } else
277
278 /*
279 * majority split
280 */
281 if (intersections > (session->member_list_entries - intersections)) {
282 return (1);
283 }
284 return (0);
285}
286
287static int decide (void)
288{
289 int i;
290
291 /*
292 * Determine if there is a subquorum
293 */
294 if (subquorum (ykd_view_list, ykd_view_list_entries, last_primary_max) == 0) {
295 return (0);
296 }
297
298 for (i = 0; i < ambiguous_sessions_max_entries; i++) {
299 if (subquorum (ykd_view_list, ykd_view_list_entries, &ambiguous_sessions_max[i]) == 0) {
300 return (0);
301 }
302
303 }
304 return (1);
305}
306
307static void ykd_session_endian_convert (struct ykd_session *ykd_session)
308{
309 int i;
310
314 for (i = 0; i < ykd_session->member_list_entries; i++) {
317 }
318}
319
320static void ykd_state_endian_convert (struct ykd_state *state)
321{
322 int i;
323
324 ykd_session_endian_convert (&state->last_primary);
327 state->session_id = swab32 (state->session_id);
328
329 for (i = 0; i < state->last_formed_entries; i++) {
330 ykd_session_endian_convert (&state->last_formed[i]);
331 }
332
333 for (i = 0; i < state->ambiguous_sessions_entries; i++) {
334 ykd_session_endian_convert (&state->ambiguous_sessions[i]);
335 }
336}
337
338static void ykd_deliver_fn (
339 unsigned int nodeid,
340 const void *msg,
341 unsigned int msg_len,
342 int endian_conversion_required)
343{
344 int all_received = 1;
345 int state_position = 0;
346 int i;
347 struct ykd_header *header = (struct ykd_header *)msg;
348 char *msg_state = (char *)msg + sizeof (struct ykd_header);
349
350 /*
351 * If this is a localhost address, this node is always primary
352 */
353#ifdef TODO
354 if (totemip_localhost_check (source_addr)) {
356 "This processor is within the primary component.");
357 primary_designated = 1;
358
359 ykd_primary_callback_fn (
360 ykd_view_list,
361 ykd_view_list_entries,
362 primary_designated,
363 &ykd_ring_id);
364 return;
365 }
366#endif
367 if (endian_conversion_required &&
368 (msg_len > sizeof (struct ykd_header))) {
369 ykd_state_endian_convert ((struct ykd_state *)msg_state);
370 }
371
372 /*
373 * Set completion for source_addr's address
374 */
375 for (state_position = 0; state_position < state_received_confchg_entries; state_position++) {
376 if (nodeid == state_received_process[state_position].nodeid) {
377 /*
378 * State position contains the address of the state to modify
379 * This may be used later by the other algorithms
380 */
381 state_received_process[state_position].received = 1;
382 break;
383 }
384 }
385
386 /*
387 * Test if all nodes have submitted their state data
388 */
389 for (i = 0; i < state_received_confchg_entries; i++) {
390 if (state_received_process[i].received == 0) {
391 all_received = 0;
392 }
393 }
394
395 /*
396 * Ignore messages from a different state
397 */
400 return;
401
402 switch (ykd_mode) {
404 assert (msg_len > sizeof (struct ykd_header));
405 /*
406 * Copy state information for the sending processor
407 */
408 memcpy (&state_received_process[state_position].ykd_state,
409 msg_state, sizeof (struct ykd_state));
410
411 /*
412 * Try to form a component
413 */
414 if (all_received) {
415 for (i = 0; i < state_received_confchg_entries; i++) {
416 state_received_process[i].received = 0;
417 }
419
420// TODO resolve optimizes for failure conditions during ykd calculation
421// resolve();
422 compute();
423
424 if (decide ()) {
425 ykd_state.session_id = session_id_max + 1;
427 ykd_view_list, sizeof (unsigned int) * ykd_view_list_entries);
430 ykd_attempt_send();
431 }
432 }
433 break;
434
435 case YKD_MODE_ATTEMPT:
436 if (all_received) {
438 "This processor is within the primary component.");
439 ykd_primary_designated = 1;
440
441 ykd_primary_callback_fn (
442 ykd_view_list,
443 ykd_view_list_entries,
444 ykd_primary_designated,
445 &ykd_ring_id);
446
447 memcpy (ykd_state.last_primary.member_list, ykd_view_list, sizeof (ykd_view_list));
448 ykd_state.last_primary.member_list_entries = ykd_view_list_entries;
451 }
452 break;
453 }
454}
455
456int first_run = 1;
457static void ykd_confchg_fn (
458 enum totem_configuration_type configuration_type,
459 const unsigned int *member_list, size_t member_list_entries,
460 const unsigned int *left_list, size_t left_list_entries,
461 const unsigned int *joined_list, size_t joined_list_entries,
462 const struct memb_ring_id *ring_id)
463{
464 int i;
465
466 if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
467 return;
468 }
469
470 memcpy (&ykd_ring_id, ring_id, sizeof (struct memb_ring_id));
471
472 if (first_run) {
476 first_run = 0;
477 }
478 memcpy (ykd_view_list, member_list,
479 member_list_entries * sizeof (unsigned int));
480 ykd_view_list_entries = member_list_entries;
481
483
484 ykd_primary_designated = 0;
485
486 ykd_primary_callback_fn (
487 ykd_view_list,
488 ykd_view_list_entries,
489 ykd_primary_designated,
490 &ykd_ring_id);
491
492 memset (&state_received_confchg, 0, sizeof (state_received_confchg));
493 for (i = 0; i < member_list_entries; i++) {
494 state_received_confchg[i].nodeid = member_list[i];
495 state_received_confchg[i].received = 0;
496 }
497 memcpy (state_received_process, state_received_confchg,
498 sizeof (state_received_confchg));
499
500 state_received_confchg_entries = member_list_entries;
501 state_received_process_entries = member_list_entries;
502
503 ykd_state_send ();
504}
505
507 .group = "ykd",
508 .group_len = 3
509};
510
511char *ykd_init (
512 struct corosync_api_v1 *corosync_api,
513 quorum_set_quorate_fn_t set_primary)
514{
515 const char *error = NULL;
516
517 ykd_primary_callback_fn = set_primary;
518 api = corosync_api;
519
520 if (set_primary == 0) {
521 error = (char *)"set primary not set";
522 }
523
524 api->tpg_init (
525 &ykd_group_handle,
526 ykd_deliver_fn,
527 ykd_confchg_fn);
528
529 api->tpg_join (
530 ykd_group_handle,
531 &ykd_group,
532 1);
533
534 ykd_state_init ();
535
536 return ((char *)error);
537}
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
@ TOTEM_CONFIGURATION_REGULAR
Definition: coroapi.h:133
unsigned int nodeid
Definition: coroapi.h:0
#define TOTEM_AGREED
Definition: coroapi.h:102
void(* quorum_set_quorate_fn_t)(const unsigned int *view_list, size_t view_list_entries, int quorate, struct memb_ring_id *)
Definition: exec/quorum.h:42
qb_handle_t hdb_handle_t
Definition: hdb.h:52
#define log_printf(level, format, args...)
Definition: logsys.h:332
#define LOGSYS_LEVEL_NOTICE
Definition: logsys.h:74
The corosync_api_v1 struct.
Definition: coroapi.h:225
int(* tpg_join)(void *instance, const struct corosync_tpg_group *groups, size_t group_cnt)
Definition: coroapi.h:330
unsigned int(* totem_nodeid_get)(void)
Definition: coroapi.h:275
int(* tpg_init)(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Definition: coroapi.h:308
int(* schedwrk_create)(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
Definition: coroapi.h:372
int(* tpg_joined_mcast)(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
Definition: coroapi.h:340
The corosync_tpg_group struct.
Definition: coroapi.h:81
const void * group
Definition: coroapi.h:82
The memb_ring_id struct.
Definition: coroapi.h:122
struct ykd_state ykd_state
Definition: vsf_ykd.c:110
unsigned int nodeid
Definition: vsf_ykd.c:108
int id
Definition: vsf_ykd.c:84
int session_id
Definition: vsf_ykd.c:90
int member_list_entries
Definition: vsf_ykd.c:89
unsigned int member_list[YKD_PROCESSOR_COUNT_MAX]
Definition: vsf_ykd.c:88
struct ykd_session last_formed[YKD_PROCESSOR_COUNT_MAX]
Definition: vsf_ykd.c:96
struct ykd_session ambiguous_sessions[YKD_PROCESSOR_COUNT_MAX]
Definition: vsf_ykd.c:100
struct ykd_session last_primary
Definition: vsf_ykd.c:94
int ambiguous_sessions_entries
Definition: vsf_ykd.c:102
int last_formed_entries
Definition: vsf_ykd.c:98
int session_id
Definition: vsf_ykd.c:104
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
int totemip_localhost_check(const struct totem_ip_address *addr)
Definition: totemip.c:225
struct memb_ring_id ring_id
Definition: totemsrp.c:4
struct totem_message_header header
Definition: totemsrp.c:0
#define YKD_PROCESSOR_COUNT_MAX
Definition: vsf_ykd.c:71
ykd_header_values
Definition: vsf_ykd.c:73
@ YKD_HEADER_SENDSTATE
Definition: vsf_ykd.c:74
@ YKD_HEADER_ATTEMPT
Definition: vsf_ykd.c:75
LOGSYS_DECLARE_SUBSYS("YKD")
hdb_handle_t schedwrk_attempt_send_callback_handle
Definition: vsf_ykd.c:143
int first_run
Definition: vsf_ykd.c:456
struct corosync_tpg_group ykd_group
Definition: vsf_ykd.c:506
hdb_handle_t schedwrk_state_send_callback_handle
Definition: vsf_ykd.c:145
ykd_mode
Definition: vsf_ykd.c:78
@ YKD_MODE_ATTEMPT
Definition: vsf_ykd.c:80
@ YKD_MODE_SENDSTATE
Definition: vsf_ykd.c:79
char * ykd_init(struct corosync_api_v1 *corosync_api, quorum_set_quorate_fn_t set_primary)
Definition: vsf_ykd.c:511