corosync 3.1.7
lib/cpg.c
Go to the documentation of this file.
1/*
2 * vi: set autoindent tabstop=4 shiftwidth=4 :
3 *
4 * Copyright (c) 2006-2015 Red Hat, Inc.
5 *
6 * All rights reserved.
7 *
8 * Author: Christine Caulfield (ccaulfi@redhat.com)
9 * Author: Jan Friesse (jfriesse@redhat.com)
10 *
11 * This software licensed under BSD license, the text of which follows:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37/*
38 * Provides a closed process group API using the coroipcc executive
39 */
40
41#include <config.h>
42
43#include <stdlib.h>
44#include <stdio.h>
45#include <string.h>
46#include <unistd.h>
47#include <sys/types.h>
48#include <sys/socket.h>
49#include <sys/mman.h>
50#include <sys/uio.h>
51#include <sys/stat.h>
52#include <errno.h>
53#include <limits.h>
54
55#include <qb/qblist.h>
56#include <qb/qbdefs.h>
57#include <qb/qbipcc.h>
58#include <qb/qblog.h>
59
60#include <corosync/hdb.h>
61#include <corosync/corotypes.h>
62#include <corosync/corodefs.h>
63#include <corosync/cpg.h>
64#include <corosync/ipc_cpg.h>
65
66#include "util.h"
67
68#ifndef MAP_ANONYMOUS
69#define MAP_ANONYMOUS MAP_ANON
70#endif
71
72/*
73 * Maximum number of times to retry a send when transmitting
74 * a large message fragment
75 */
76#define MAX_RETRIES 100
77
78/*
79 * ZCB files have following umask (umask is same as used in libqb)
80 */
81#define CPG_MEMORY_MAP_UMASK 077
82
84{
85 struct qb_list_head list;
86 uint32_t nodeid;
87 uint32_t pid;
90};
91
92struct cpg_inst {
93 qb_ipcc_connection_t *c;
95 void *context;
96 union {
99 };
100 struct qb_list_head iteration_list_head;
101 uint32_t max_msg_size;
102 struct qb_list_head assembly_list_head;
103};
104static void cpg_inst_free (void *inst);
105
106DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
107
110 qb_ipcc_connection_t *conn;
112 struct qb_list_head list;
113};
114
115DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
116
117
118/*
119 * Internal (not visible by API) functions
120 */
121
122static cs_error_t
123coroipcc_msg_send_reply_receive (
124 qb_ipcc_connection_t *c,
125 const struct iovec *iov,
126 unsigned int iov_len,
127 void *res_msg,
128 size_t res_len)
129{
130 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
132}
133
134static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
135{
136 qb_list_del (&cpg_iteration_instance->list);
137 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
138}
139
140static void cpg_inst_free (void *inst)
141{
142 struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
143 qb_ipcc_disconnect(cpg_inst->c);
144}
145
146static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
147{
148 struct qb_list_head *iter, *tmp_iter;
150
151 /*
152 * Traverse thru iteration instances and delete them
153 */
154 qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->iteration_list_head)) {
155 cpg_iteration_instance = qb_list_entry (iter, struct cpg_iteration_instance_t, list);
156
157 cpg_iteration_instance_finalize (cpg_iteration_instance);
158 }
159 hdb_handle_destroy (&cpg_handle_t_db, handle);
160}
161
170 cpg_handle_t *handle,
171 cpg_callbacks_t *callbacks)
172{
173 cpg_model_v1_data_t model_v1_data;
174
175 memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
176
177 if (callbacks) {
178 model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
179 model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
180 }
181
182 return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
183}
184
186 cpg_handle_t *handle,
187 cpg_model_t model,
188 cpg_model_data_t *model_data,
189 void *context)
190{
191 cs_error_t error;
192 struct cpg_inst *cpg_inst;
193
194 if (model != CPG_MODEL_V1) {
195 error = CS_ERR_INVALID_PARAM;
196 goto error_no_destroy;
197 }
198
199 error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
200 if (error != CS_OK) {
201 goto error_no_destroy;
202 }
203
204 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
205 if (error != CS_OK) {
206 goto error_destroy;
207 }
208
209 cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
210 if (cpg_inst->c == NULL) {
211 error = qb_to_cs_error(-errno);
212 goto error_put_destroy;
213 }
214
215 if (model_data != NULL) {
216 switch (model) {
217 case CPG_MODEL_V1:
220 error = CS_ERR_INVALID_PARAM;
221
222 goto error_destroy;
223 }
224 break;
225 }
226 }
227
228 /* Allow space for corosync internal headers */
230 cpg_inst->model_data.model = model;
232
233 qb_list_init(&cpg_inst->iteration_list_head);
234
235 qb_list_init(&cpg_inst->assembly_list_head);
236
237 hdb_handle_put (&cpg_handle_t_db, *handle);
238
239 return (CS_OK);
240
241error_put_destroy:
242 hdb_handle_put (&cpg_handle_t_db, *handle);
243error_destroy:
244 hdb_handle_destroy (&cpg_handle_t_db, *handle);
245error_no_destroy:
246 return (error);
247}
248
250 cpg_handle_t handle)
251{
252 struct cpg_inst *cpg_inst;
253 struct iovec iov;
256 cs_error_t error;
257
258 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
259 if (error != CS_OK) {
260 return (error);
261 }
262
263 /*
264 * Another thread has already started finalizing
265 */
266 if (cpg_inst->finalize) {
267 hdb_handle_put (&cpg_handle_t_db, handle);
268 return (CS_ERR_BAD_HANDLE);
269 }
270
271 cpg_inst->finalize = 1;
272
273 /*
274 * Send service request
275 */
276 req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
278
279 iov.iov_base = (void *)&req_lib_cpg_finalize;
280 iov.iov_len = sizeof (struct req_lib_cpg_finalize);
281
282 error = coroipcc_msg_send_reply_receive (cpg_inst->c,
283 &iov,
284 1,
286 sizeof (struct res_lib_cpg_finalize));
287
288 cpg_inst_finalize (cpg_inst, handle);
289 hdb_handle_put (&cpg_handle_t_db, handle);
290
291 return (error);
292}
293
295 cpg_handle_t handle,
296 int *fd)
297{
298 cs_error_t error;
299 struct cpg_inst *cpg_inst;
300
301 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
302 if (error != CS_OK) {
303 return (error);
304 }
305
306 error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
307
308 hdb_handle_put (&cpg_handle_t_db, handle);
309
310 return (error);
311}
312
314 cpg_handle_t handle,
315 uint32_t *size)
316{
317 cs_error_t error;
318 struct cpg_inst *cpg_inst;
319
320 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
321 if (error != CS_OK) {
322 return (error);
323 }
324
325 *size = cpg_inst->max_msg_size;
326
327 hdb_handle_put (&cpg_handle_t_db, handle);
328
329 return (error);
330}
331
333 cpg_handle_t handle,
334 void **context)
335{
336 cs_error_t error;
337 struct cpg_inst *cpg_inst;
338
339 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
340 if (error != CS_OK) {
341 return (error);
342 }
343
345
346 hdb_handle_put (&cpg_handle_t_db, handle);
347
348 return (CS_OK);
349}
350
352 cpg_handle_t handle,
353 void *context)
354{
355 cs_error_t error;
356 struct cpg_inst *cpg_inst;
357
358 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
359 if (error != CS_OK) {
360 return (error);
361 }
362
364
365 hdb_handle_put (&cpg_handle_t_db, handle);
366
367 return (CS_OK);
368}
369
371 cpg_handle_t handle,
372 cs_dispatch_flags_t dispatch_types)
373{
374 int timeout = -1;
375 cs_error_t error;
376 int cont = 1; /* always continue do loop except when set to 0 */
377 struct cpg_inst *cpg_inst;
378 struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
379 struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
380 struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
381 struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
382 struct cpg_inst cpg_inst_copy;
383 struct qb_ipc_response_header *dispatch_data;
384 struct cpg_address member_list[CPG_MEMBERS_MAX];
385 struct cpg_address left_list[CPG_MEMBERS_MAX];
386 struct cpg_address joined_list[CPG_MEMBERS_MAX];
387 struct cpg_name group_name;
388 struct cpg_assembly_data *assembly_data;
389 struct qb_list_head *iter, *tmp_iter;
390 mar_cpg_address_t *left_list_start;
391 mar_cpg_address_t *joined_list_start;
392 unsigned int i;
393 struct cpg_ring_id ring_id;
394 uint32_t totem_member_list[CPG_MEMBERS_MAX];
395 int32_t errno_res;
396 char dispatch_buf[IPC_DISPATCH_SIZE];
397
398 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
399 if (error != CS_OK) {
400 return (error);
401 }
402
403 /*
404 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
405 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
406 */
407 if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
408 timeout = 0;
409 }
410
411 dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
412 do {
413 errno_res = qb_ipcc_event_recv (
414 cpg_inst->c,
415 dispatch_buf,
417 timeout);
418 error = qb_to_cs_error (errno_res);
419 if (error == CS_ERR_BAD_HANDLE) {
420 error = CS_OK;
421 goto error_put;
422 }
423 if (error == CS_ERR_TRY_AGAIN) {
424 if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
425 /*
426 * Don't mask error
427 */
428 goto error_put;
429 }
430 error = CS_OK;
431 if (dispatch_types == CS_DISPATCH_ALL) {
432 break; /* exit do while cont is 1 loop */
433 } else {
434 continue; /* next poll */
435 }
436 }
437 if (error != CS_OK) {
438 goto error_put;
439 }
440
441 /*
442 * Make copy of callbacks, message data, unlock instance, and call callback
443 * A risk of this dispatch method is that the callback routines may
444 * operate at the same time that cpgFinalize has been called.
445 */
446 memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
447 switch (cpg_inst_copy.model_data.model) {
448 case CPG_MODEL_V1:
449 /*
450 * Dispatch incoming message
451 */
452 switch (dispatch_data->id) {
454 if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
455 break;
456 }
457
458 res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
459
460 marshall_from_mar_cpg_name_t (
461 &group_name,
462 &res_cpg_deliver_callback->group_name);
463
464 cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
465 &group_name,
466 res_cpg_deliver_callback->nodeid,
467 res_cpg_deliver_callback->pid,
468 &res_cpg_deliver_callback->message,
469 res_cpg_deliver_callback->msglen);
470 break;
471
473 res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
474
475 marshall_from_mar_cpg_name_t (
476 &group_name,
477 &res_cpg_partial_deliver_callback->group_name);
478
479 /*
480 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
481 */
482 assembly_data = NULL;
483 qb_list_for_each(iter, &(cpg_inst->assembly_list_head)) {
484 struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
485 if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
486 assembly_data = current_assembly_data;
487 break;
488 }
489 }
490
491 if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
492
493 /*
494 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
495 * Otherwise the sending of packet must have been interrupted and error should have
496 * been reported to sending client. Therefore here last assembly will be dropped.
497 */
498 if (assembly_data) {
499 qb_list_del (&assembly_data->list);
500 free(assembly_data->assembly_buf);
501 free(assembly_data);
502 assembly_data = NULL;
503 }
504
505 assembly_data = malloc(sizeof(struct cpg_assembly_data));
506 if (!assembly_data) {
507 error = CS_ERR_NO_MEMORY;
508 goto error_put;
509 }
510
511 assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
512 assembly_data->pid = res_cpg_partial_deliver_callback->pid;
513 assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
514 if (!assembly_data->assembly_buf) {
515 free(assembly_data);
516 error = CS_ERR_NO_MEMORY;
517 goto error_put;
518 }
519 assembly_data->assembly_buf_ptr = 0;
520 qb_list_init (&assembly_data->list);
521
522 qb_list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
523 }
524 if (assembly_data) {
525 memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
526 res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
527 assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
528
529 if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
530 cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
531 &group_name,
532 res_cpg_partial_deliver_callback->nodeid,
533 res_cpg_partial_deliver_callback->pid,
534 assembly_data->assembly_buf,
535 res_cpg_partial_deliver_callback->msglen);
536
537 qb_list_del (&assembly_data->list);
538 free(assembly_data->assembly_buf);
539 free(assembly_data);
540 }
541 }
542 break;
543
545 if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
546 break;
547 }
548
549 res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
550
551 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
552 marshall_from_mar_cpg_address_t (&member_list[i],
553 &res_cpg_confchg_callback->member_list[i]);
554 }
555 left_list_start = res_cpg_confchg_callback->member_list +
556 res_cpg_confchg_callback->member_list_entries;
557 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
558 marshall_from_mar_cpg_address_t (&left_list[i],
559 &left_list_start[i]);
560 }
561 joined_list_start = res_cpg_confchg_callback->member_list +
562 res_cpg_confchg_callback->member_list_entries +
563 res_cpg_confchg_callback->left_list_entries;
564 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
565 marshall_from_mar_cpg_address_t (&joined_list[i],
566 &joined_list_start[i]);
567 }
568 marshall_from_mar_cpg_name_t (
569 &group_name,
570 &res_cpg_confchg_callback->group_name);
571
572 cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
573 &group_name,
575 res_cpg_confchg_callback->member_list_entries,
576 left_list,
577 res_cpg_confchg_callback->left_list_entries,
578 joined_list,
579 res_cpg_confchg_callback->joined_list_entries);
580
581 /*
582 * If member left while his partial packet was being assembled, assembly data must be removed from list
583 */
584 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
585 qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->assembly_list_head)) {
586 struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
587 if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
588 continue;
589
590 qb_list_del (&current_assembly_data->list);
591 free(current_assembly_data->assembly_buf);
592 free(current_assembly_data);
593 }
594 }
595
596 break;
598 if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
599 break;
600 }
601
602 res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
603
604 marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
605 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
606 totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
607 }
608
609 cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
610 ring_id,
611 res_cpg_totem_confchg_callback->member_list_entries,
612 totem_member_list);
613 break;
614 default:
615 error = CS_ERR_LIBRARY;
616 goto error_put;
617 break;
618 } /* - switch (dispatch_data->id) */
619 break; /* case CPG_MODEL_V1 */
620 } /* - switch (cpg_inst_copy.model_data.model) */
621
622 if (cpg_inst_copy.finalize || cpg_inst->finalize) {
623 /*
624 * If the finalize has been called then get out of the dispatch.
625 */
626 cpg_inst->finalize = 1;
627 error = CS_ERR_BAD_HANDLE;
628 goto error_put;
629 }
630
631 /*
632 * Determine if more messages should be processed
633 */
634 if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
635 cont = 0;
636 }
637 } while (cont);
638
639error_put:
640 hdb_handle_put (&cpg_handle_t_db, handle);
641 return (error);
642}
643
645 cpg_handle_t handle,
646 const struct cpg_name *group)
647{
648 cs_error_t error;
649 struct cpg_inst *cpg_inst;
650 struct iovec iov[2];
652 struct res_lib_cpg_join response;
653
654 if (group->length > CPG_MAX_NAME_LENGTH) {
655 return (CS_ERR_NAME_TOO_LONG);
656 }
657
658 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
659 if (error != CS_OK) {
660 return (error);
661 }
662
663 /* Now join */
664 req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
666 req_lib_cpg_join.pid = getpid();
667 req_lib_cpg_join.flags = 0;
668
669 switch (cpg_inst->model_data.model) {
670 case CPG_MODEL_V1:
672 break;
673 }
674
675 marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
676 group);
677
678 iov[0].iov_base = (void *)&req_lib_cpg_join;
679 iov[0].iov_len = sizeof (struct req_lib_cpg_join);
680
681 do {
682 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
683 &response, sizeof (struct res_lib_cpg_join));
684
685 if (error != CS_OK) {
686 goto error_exit;
687 }
688 } while (response.header.error == CS_ERR_BUSY);
689
690 error = response.header.error;
691
692error_exit:
693 hdb_handle_put (&cpg_handle_t_db, handle);
694
695 return (error);
696}
697
699 cpg_handle_t handle,
700 const struct cpg_name *group)
701{
702 cs_error_t error;
703 struct cpg_inst *cpg_inst;
704 struct iovec iov[2];
707
708 if (group->length > CPG_MAX_NAME_LENGTH) {
709 return (CS_ERR_NAME_TOO_LONG);
710 }
711
712 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
713 if (error != CS_OK) {
714 return (error);
715 }
716
717 req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
719 req_lib_cpg_leave.pid = getpid();
720 marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
721 group);
722
723 iov[0].iov_base = (void *)&req_lib_cpg_leave;
724 iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
725
726 do {
727 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
728 &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
729
730 if (error != CS_OK) {
731 goto error_exit;
732 }
733 } while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
734
735 error = res_lib_cpg_leave.header.error;
736
737error_exit:
738 hdb_handle_put (&cpg_handle_t_db, handle);
739
740 return (error);
741}
742
744 cpg_handle_t handle,
745 struct cpg_name *group_name,
746 struct cpg_address *member_list,
747 int *member_list_entries)
748{
749 cs_error_t error;
750 struct cpg_inst *cpg_inst;
751 struct iovec iov;
754 unsigned int i;
755
756 if (group_name->length > CPG_MAX_NAME_LENGTH) {
757 return (CS_ERR_NAME_TOO_LONG);
758 }
759 if (member_list == NULL) {
760 return (CS_ERR_INVALID_PARAM);
761 }
762 if (member_list_entries == NULL) {
763 return (CS_ERR_INVALID_PARAM);
764 }
765
766 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
767 if (error != CS_OK) {
768 return (error);
769 }
770
771 req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
773
774 marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
775 group_name);
776
777 iov.iov_base = (void *)&req_lib_cpg_membership_get;
778 iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
779
780 error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
782
783 if (error != CS_OK) {
784 goto error_exit;
785 }
786
787 error = res_lib_cpg_membership_get.header.error;
788
789 /*
790 * Copy results to caller
791 */
792 *member_list_entries = res_lib_cpg_membership_get.member_count;
793 if (member_list) {
794 for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
795 marshall_from_mar_cpg_address_t (&member_list[i],
797 }
798 }
799
800error_exit:
801 hdb_handle_put (&cpg_handle_t_db, handle);
802
803 return (error);
804}
805
807 cpg_handle_t handle,
808 unsigned int *local_nodeid)
809{
810 cs_error_t error;
811 struct cpg_inst *cpg_inst;
812 struct iovec iov;
815
816 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
817 if (error != CS_OK) {
818 return (error);
819 }
820
821 req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
823
824 iov.iov_base = (void *)&req_lib_cpg_local_get;
825 iov.iov_len = sizeof (struct req_lib_cpg_local_get);
826
827 error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
829
830 if (error != CS_OK) {
831 goto error_exit;
832 }
833
834 error = res_lib_cpg_local_get.header.error;
835
836 *local_nodeid = res_lib_cpg_local_get.local_nodeid;
837
838error_exit:
839 hdb_handle_put (&cpg_handle_t_db, handle);
840
841 return (error);
842}
843
845 cpg_handle_t handle,
846 cpg_flow_control_state_t *flow_control_state)
847{
848 cs_error_t error;
849 struct cpg_inst *cpg_inst;
850
851 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
852 if (error != CS_OK) {
853 return (error);
854 }
855 *flow_control_state = CPG_FLOW_CONTROL_DISABLED;
856 error = CS_OK;
857
858 hdb_handle_put (&cpg_handle_t_db, handle);
859
860 return (error);
861}
862
863static int
864memory_map (char *path, const char *file, void **buf, size_t bytes)
865{
866 int32_t fd;
867 void *addr;
868 int32_t res;
869 char *buffer;
870 int32_t i;
871 size_t written;
872 size_t page_size;
873 long int sysconf_page_size;
874 mode_t old_umask;
875
876 snprintf (path, PATH_MAX, "/dev/shm/%s", file);
877
878 old_umask = umask(CPG_MEMORY_MAP_UMASK);
879 fd = mkstemp (path);
880 (void)umask(old_umask);
881 if (fd == -1) {
882 snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
883 old_umask = umask(CPG_MEMORY_MAP_UMASK);
884 fd = mkstemp (path);
885 (void)umask(old_umask);
886 if (fd == -1) {
887 return (-1);
888 }
889 }
890
891 res = ftruncate (fd, bytes);
892 if (res == -1) {
893 goto error_close_unlink;
894 }
895 sysconf_page_size = sysconf(_SC_PAGESIZE);
896 if (sysconf_page_size <= 0) {
897 goto error_close_unlink;
898 }
899 page_size = sysconf_page_size;
900 buffer = malloc (page_size);
901 if (buffer == NULL) {
902 goto error_close_unlink;
903 }
904 memset (buffer, 0, page_size);
905 for (i = 0; i < (bytes / page_size); i++) {
906retry_write:
907 written = write (fd, buffer, page_size);
908 if (written == -1 && errno == EINTR) {
909 goto retry_write;
910 }
911 if (written != page_size) {
912 free (buffer);
913 goto error_close_unlink;
914 }
915 }
916 free (buffer);
917
918 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
919 MAP_SHARED, fd, 0);
920
921 if (addr == MAP_FAILED) {
922 goto error_close_unlink;
923 }
924#ifdef MADV_NOSYNC
925 madvise(addr, bytes, MADV_NOSYNC);
926#endif
927
928 res = close (fd);
929 if (res) {
930 munmap(addr, bytes);
931
932 return (-1);
933 }
934 *buf = addr;
935
936 return 0;
937
938error_close_unlink:
939 close (fd);
940 unlink(path);
941 return -1;
942}
943
945 cpg_handle_t handle,
946 size_t size,
947 void **buffer)
948{
949 void *buf = NULL;
950 char path[PATH_MAX];
951 mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
952 struct qb_ipc_response_header res_coroipcs_zc_alloc;
953 size_t map_size;
954 struct iovec iovec;
955 struct coroipcs_zc_header *hdr;
956 cs_error_t error;
957 struct cpg_inst *cpg_inst;
958
959 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
960 if (error != CS_OK) {
961 return (error);
962 }
963
964 map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
965 assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
966
967 if (strlen(path) >= CPG_ZC_PATH_LEN) {
968 unlink(path);
969 munmap (buf, map_size);
970 return (CS_ERR_NAME_TOO_LONG);
971 }
972
973 req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
974 req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
975 req_coroipcc_zc_alloc.map_size = map_size;
976 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
977
978 iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
979 iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
980
981 error = coroipcc_msg_send_reply_receive (
982 cpg_inst->c,
983 &iovec,
984 1,
985 &res_coroipcs_zc_alloc,
986 sizeof (struct qb_ipc_response_header));
987
988 if (error != CS_OK) {
989 goto error_exit;
990 }
991
992 hdr = (struct coroipcs_zc_header *)buf;
993 hdr->map_size = map_size;
994 *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
995
996error_exit:
997 hdb_handle_put (&cpg_handle_t_db, handle);
998 return (error);
999}
1000
1002 cpg_handle_t handle,
1003 void *buffer)
1004{
1005 cs_error_t error;
1006 unsigned int res;
1007 struct cpg_inst *cpg_inst;
1008 mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
1009 struct qb_ipc_response_header res_coroipcs_zc_free;
1010 struct iovec iovec;
1011 struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
1012
1013 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1014 if (error != CS_OK) {
1015 return (error);
1016 }
1017
1018 req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
1019 req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
1020 req_coroipcc_zc_free.map_size = header->map_size;
1021 req_coroipcc_zc_free.server_address = header->server_address;
1022
1023 iovec.iov_base = (void *)&req_coroipcc_zc_free;
1024 iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
1025
1026 error = coroipcc_msg_send_reply_receive (
1027 cpg_inst->c,
1028 &iovec,
1029 1,
1030 &res_coroipcs_zc_free,
1031 sizeof (struct qb_ipc_response_header));
1032
1033 if (error != CS_OK) {
1034 goto error_exit;
1035 }
1036
1037 res = munmap ((void *)header, header->map_size);
1038 if (res == -1) {
1039 error = qb_to_cs_error(-errno);
1040
1041 goto error_exit;
1042 }
1043
1044error_exit:
1045 hdb_handle_put (&cpg_handle_t_db, handle);
1046
1047 return (error);
1048}
1049
1051 cpg_handle_t handle,
1053 void *msg,
1054 size_t msg_len)
1055{
1056 cs_error_t error;
1057 struct cpg_inst *cpg_inst;
1060 mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1061 struct coroipcs_zc_header *hdr;
1062 struct iovec iovec;
1063
1064 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1065 if (error != CS_OK) {
1066 return (error);
1067 }
1068
1069 if (msg_len > IPC_REQUEST_SIZE) {
1070 error = CS_ERR_TOO_BIG;
1071 goto error_exit;
1072 }
1073
1074 req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1075 req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1076 msg_len;
1077
1079 req_lib_cpg_mcast->guarantee = guarantee;
1080 req_lib_cpg_mcast->msglen = msg_len;
1081
1082 hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1083
1084 req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1085 req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1086 req_coroipcc_zc_execute.server_address = hdr->server_address;
1087
1088 iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1089 iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1090
1091 error = coroipcc_msg_send_reply_receive (
1092 cpg_inst->c,
1093 &iovec,
1094 1,
1096 sizeof(res_lib_cpg_mcast));
1097
1098 if (error != CS_OK) {
1099 goto error_exit;
1100 }
1101
1102 error = res_lib_cpg_mcast.header.error;
1103
1104error_exit:
1105 hdb_handle_put (&cpg_handle_t_db, handle);
1106
1107 return (error);
1108}
1109
1110static cs_error_t send_fragments (
1111 struct cpg_inst *cpg_inst,
1113 size_t msg_len,
1114 const struct iovec *iovec,
1115 unsigned int iov_len)
1116{
1117 int i;
1118 cs_error_t error = CS_OK;
1119 struct iovec iov[2];
1122 size_t sent = 0;
1123 size_t iov_sent = 0;
1124 int retry_count;
1125
1127 req_lib_cpg_mcast.guarantee = guarantee;
1128 req_lib_cpg_mcast.msglen = msg_len;
1129
1130 iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1131 iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1132
1133 i=0;
1134 iov_sent = 0 ;
1135 qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1136
1137 while (error == CS_OK && sent < msg_len) {
1138
1139 retry_count = 0;
1140 if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1141 iov[1].iov_len = cpg_inst->max_msg_size;
1142 }
1143 else {
1144 iov[1].iov_len = iovec[i].iov_len - iov_sent;
1145 }
1146
1147 if (sent == 0) {
1149 }
1150 else if ((sent + iov[1].iov_len) == msg_len) {
1152 }
1153 else {
1155 }
1156
1157 req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1158 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1159 iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1160
1161 resend:
1162 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1164 sizeof (res_lib_cpg_partial_send));
1165
1166 if (error == CS_ERR_TRY_AGAIN) {
1167 fprintf(stderr, "sleep. counter=%d\n", retry_count);
1168 if (++retry_count > MAX_RETRIES) {
1169 goto error_exit;
1170 }
1171 usleep(10000);
1172 goto resend;
1173 }
1174
1175 iov_sent += iov[1].iov_len;
1176 sent += iov[1].iov_len;
1177
1178 /* Next iovec */
1179 if (iov_sent >= iovec[i].iov_len) {
1180 i++;
1181 iov_sent = 0;
1182 }
1183 error = res_lib_cpg_partial_send.header.error;
1184 }
1185error_exit:
1186 qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1187
1188 return error;
1189}
1190
1191
1193 cpg_handle_t handle,
1195 const struct iovec *iovec,
1196 unsigned int iov_len)
1197{
1198 int i;
1199 cs_error_t error;
1200 struct cpg_inst *cpg_inst;
1201 struct iovec iov[64];
1203 size_t msg_len = 0;
1204
1205 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1206 if (error != CS_OK) {
1207 return (error);
1208 }
1209
1210 for (i = 0; i < iov_len; i++ ) {
1211 msg_len += iovec[i].iov_len;
1212 }
1213
1214 if (msg_len > cpg_inst->max_msg_size) {
1215 error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1216 goto error_exit;
1217 }
1218
1219 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1220 msg_len;
1221
1223 req_lib_cpg_mcast.guarantee = guarantee;
1224 req_lib_cpg_mcast.msglen = msg_len;
1225
1226 iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1227 iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1228 memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1229
1230 qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1231 error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1232 qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1233
1234error_exit:
1235 hdb_handle_put (&cpg_handle_t_db, handle);
1236
1237 return (error);
1238}
1239
1241 cpg_handle_t handle,
1242 cpg_iteration_type_t iteration_type,
1243 const struct cpg_name *group,
1244 cpg_iteration_handle_t *cpg_iteration_handle)
1245{
1246 cs_error_t error;
1247 struct iovec iov;
1248 struct cpg_inst *cpg_inst;
1252
1253 if (group && group->length > CPG_MAX_NAME_LENGTH) {
1254 return (CS_ERR_NAME_TOO_LONG);
1255 }
1256 if (cpg_iteration_handle == NULL) {
1257 return (CS_ERR_INVALID_PARAM);
1258 }
1259
1260 if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1261 (iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1262 return (CS_ERR_INVALID_PARAM);
1263 }
1264
1265 if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1266 iteration_type != CPG_ITERATION_ALL) {
1267
1268 return (CS_ERR_INVALID_PARAM);
1269 }
1270
1271 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1272 if (error != CS_OK) {
1273 return (error);
1274 }
1275
1276 error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1277 sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1278 if (error != CS_OK) {
1279 goto error_put_cpg_db;
1280 }
1281
1282 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1283 (void *)&cpg_iteration_instance));
1284 if (error != CS_OK) {
1285 goto error_destroy;
1286 }
1287
1289
1290 qb_list_init (&cpg_iteration_instance->list);
1291
1294 req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1295 if (group) {
1296 marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1297 }
1298
1299 iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1300 iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1301
1302 error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1303 &iov,
1304 1,
1306 sizeof (struct res_lib_cpg_iterationinitialize));
1307
1308 if (error != CS_OK) {
1309 goto error_put_destroy;
1310 }
1311
1312 cpg_iteration_instance->executive_iteration_handle =
1313 res_lib_cpg_iterationinitialize.iteration_handle;
1314 cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1315
1317
1318 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1319 hdb_handle_put (&cpg_handle_t_db, handle);
1320
1321 return (res_lib_cpg_iterationinitialize.header.error);
1322
1323error_put_destroy:
1324 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1325error_destroy:
1326 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1327error_put_cpg_db:
1328 hdb_handle_put (&cpg_handle_t_db, handle);
1329
1330 return (error);
1331}
1332
1335 struct cpg_iteration_description_t *description)
1336{
1337 cs_error_t error;
1341
1342 if (description == NULL) {
1343 return CS_ERR_INVALID_PARAM;
1344 }
1345
1346 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1347 (void *)&cpg_iteration_instance));
1348 if (error != CS_OK) {
1349 goto error_exit;
1350 }
1351
1352 req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1354 req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1355
1356 error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1358 req_lib_cpg_iterationnext.header.size));
1359 if (error != CS_OK) {
1360 goto error_put;
1361 }
1362
1363 error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1365 sizeof(struct res_lib_cpg_iterationnext), -1));
1366 if (error != CS_OK) {
1367 goto error_put;
1368 }
1369
1370 marshall_from_mar_cpg_iteration_description_t(
1371 description,
1372 &res_lib_cpg_iterationnext.description);
1373
1374 error = res_lib_cpg_iterationnext.header.error;
1375
1376error_put:
1377 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1378
1379error_exit:
1380 return (error);
1381}
1382
1385{
1386 cs_error_t error;
1387 struct iovec iov;
1391
1392 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1393 (void *)&cpg_iteration_instance));
1394 if (error != CS_OK) {
1395 goto error_exit;
1396 }
1397
1400 req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1401
1402 iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1403 iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1404
1405 error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1406 &iov,
1407 1,
1409 sizeof (struct req_lib_cpg_iterationfinalize));
1410
1411 if (error != CS_OK) {
1412 goto error_put;
1413 }
1414
1415 cpg_iteration_instance_finalize (cpg_iteration_instance);
1416 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1417
1418 return (res_lib_cpg_iterationfinalize.header.error);
1419
1420error_put:
1421 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1422error_exit:
1423 return (error);
1424}
1425
#define LOCALSTATEDIR
Definition: config.h:373
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:2
cs_dispatch_flags_t
The cs_dispatch_flags_t enum.
Definition: corotypes.h:84
@ CS_DISPATCH_ONE
Definition: corotypes.h:85
@ CS_DISPATCH_ONE_NONBLOCKING
Definition: corotypes.h:88
@ CS_DISPATCH_ALL
Definition: corotypes.h:86
cs_error_t qb_to_cs_error(int result)
qb_to_cs_error
cs_error_t
The cs_error_t enum.
Definition: corotypes.h:98
@ CS_ERR_NAME_TOO_LONG
Definition: corotypes.h:111
@ CS_ERR_NO_MEMORY
Definition: corotypes.h:106
@ CS_ERR_BUSY
Definition: corotypes.h:108
@ CS_ERR_BAD_HANDLE
Definition: corotypes.h:107
@ CS_ERR_TRY_AGAIN
Definition: corotypes.h:104
@ CS_OK
Definition: corotypes.h:99
@ CS_ERR_INVALID_PARAM
Definition: corotypes.h:105
@ CS_ERR_LIBRARY
Definition: corotypes.h:100
@ CS_ERR_TOO_BIG
Definition: corotypes.h:124
#define CS_IPC_TIMEOUT_MS
Definition: corotypes.h:131
cs_error_t cpg_flow_control_state_get(cpg_handle_t handle, cpg_flow_control_state_t *flow_control_state)
cpg_flow_control_state_get
Definition: lib/cpg.c:844
cs_error_t cpg_iteration_next(cpg_iteration_handle_t handle, struct cpg_iteration_description_t *description)
cpg_iteration_next
Definition: lib/cpg.c:1333
cs_error_t cpg_model_initialize(cpg_handle_t *handle, cpg_model_t model, cpg_model_data_t *model_data, void *context)
Create a new cpg connection, initialize with model.
Definition: lib/cpg.c:185
cs_error_t cpg_max_atomic_msgsize_get(cpg_handle_t handle, uint32_t *size)
Get maximum size of a message that will not be fragmented.
Definition: lib/cpg.c:313
cs_error_t cpg_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, const struct iovec *iovec, unsigned int iov_len)
Multicast to groups joined with cpg_join.
Definition: lib/cpg.c:1192
cs_error_t cpg_leave(cpg_handle_t handle, const struct cpg_name *group)
Leave one or more groups.
Definition: lib/cpg.c:698
cs_error_t cpg_context_get(cpg_handle_t handle, void **context)
Get contexts for a CPG handle.
Definition: lib/cpg.c:332
cs_error_t cpg_fd_get(cpg_handle_t handle, int *fd)
Get a file descriptor on which to poll.
Definition: lib/cpg.c:294
cs_error_t cpg_join(cpg_handle_t handle, const struct cpg_name *group)
Join one or more groups.
Definition: lib/cpg.c:644
cs_error_t cpg_zcb_free(cpg_handle_t handle, void *buffer)
cpg_zcb_free
Definition: lib/cpg.c:1001
cs_error_t cpg_iteration_finalize(cpg_iteration_handle_t handle)
cpg_iteration_finalize
Definition: lib/cpg.c:1383
cs_error_t cpg_zcb_alloc(cpg_handle_t handle, size_t size, void **buffer)
cpg_zcb_alloc
Definition: lib/cpg.c:944
cs_error_t cpg_membership_get(cpg_handle_t handle, struct cpg_name *group_name, struct cpg_address *member_list, int *member_list_entries)
Get membership information from cpg.
Definition: lib/cpg.c:743
cs_error_t cpg_initialize(cpg_handle_t *handle, cpg_callbacks_t *callbacks)
Create a new cpg connection.
Definition: lib/cpg.c:169
cs_error_t cpg_local_get(cpg_handle_t handle, unsigned int *local_nodeid)
cpg_local_get
Definition: lib/cpg.c:806
cs_error_t cpg_iteration_initialize(cpg_handle_t handle, cpg_iteration_type_t iteration_type, const struct cpg_name *group, cpg_iteration_handle_t *cpg_iteration_handle)
cpg_iteration_initialize
Definition: lib/cpg.c:1240
cs_error_t cpg_context_set(cpg_handle_t handle, void *context)
Set contexts for a CPG handle.
Definition: lib/cpg.c:351
cs_error_t cpg_finalize(cpg_handle_t handle)
Close the cpg handle.
Definition: lib/cpg.c:249
cs_error_t cpg_dispatch(cpg_handle_t handle, cs_dispatch_flags_t dispatch_types)
Dispatch messages and configuration changes.
Definition: lib/cpg.c:370
cs_error_t cpg_zcb_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, void *msg, size_t msg_len)
cpg_zcb_mcast_joined
Definition: lib/cpg.c:1050
cpg_flow_control_state_t
The cpg_flow_control_state_t enum.
Definition: cpg.h:73
#define CPG_MAX_NAME_LENGTH
Definition: cpg.h:116
cpg_guarantee_t
The cpg_guarantee_t enum.
Definition: cpg.h:63
cpg_iteration_type_t
The cpg_iteration_type_t enum.
Definition: cpg.h:94
uint64_t cpg_handle_t
cpg_handle_t
Definition: cpg.h:53
uint64_t cpg_iteration_handle_t
cpg_iteration_handle_t
Definition: cpg.h:58
cpg_model_t
The cpg_model_t enum.
Definition: cpg.h:103
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition: cpg.h:193
#define CPG_MEMBERS_MAX
Definition: cpg.h:125
@ CPG_FLOW_CONTROL_DISABLED
flow control is disabled - new messages may be sent
Definition: cpg.h:74
@ CPG_ITERATION_ONE_GROUP
Definition: cpg.h:96
@ CPG_ITERATION_ALL
Definition: cpg.h:97
@ CPG_ITERATION_NAME_ONLY
Definition: cpg.h:95
@ CPG_MODEL_V1
Definition: cpg.h:104
qb_handle_t hdb_handle_t
Definition: hdb.h:52
@ LIBCPG_PARTIAL_FIRST
Definition: ipc_cpg.h:104
@ LIBCPG_PARTIAL_LAST
Definition: ipc_cpg.h:106
@ LIBCPG_PARTIAL_CONTINUED
Definition: ipc_cpg.h:105
#define CPG_ZC_PATH_LEN
Definition: ipc_cpg.h:43
@ MESSAGE_REQ_CPG_ZC_EXECUTE
Definition: ipc_cpg.h:60
@ MESSAGE_REQ_CPG_LOCAL_GET
Definition: ipc_cpg.h:53
@ MESSAGE_REQ_CPG_ITERATIONFINALIZE
Definition: ipc_cpg.h:56
@ MESSAGE_REQ_CPG_PARTIAL_MCAST
Definition: ipc_cpg.h:61
@ MESSAGE_REQ_CPG_ZC_ALLOC
Definition: ipc_cpg.h:58
@ MESSAGE_REQ_CPG_JOIN
Definition: ipc_cpg.h:49
@ MESSAGE_REQ_CPG_ZC_FREE
Definition: ipc_cpg.h:59
@ MESSAGE_REQ_CPG_ITERATIONINITIALIZE
Definition: ipc_cpg.h:54
@ MESSAGE_REQ_CPG_FINALIZE
Definition: ipc_cpg.h:57
@ MESSAGE_REQ_CPG_MEMBERSHIP
Definition: ipc_cpg.h:52
@ MESSAGE_REQ_CPG_LEAVE
Definition: ipc_cpg.h:50
@ MESSAGE_REQ_CPG_ITERATIONNEXT
Definition: ipc_cpg.h:55
@ MESSAGE_REQ_CPG_MCAST
Definition: ipc_cpg.h:51
@ MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK
Definition: ipc_cpg.h:85
@ MESSAGE_RES_CPG_DELIVER_CALLBACK
Definition: ipc_cpg.h:73
@ MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK
Definition: ipc_cpg.h:81
@ MESSAGE_RES_CPG_CONFCHG_CALLBACK
Definition: ipc_cpg.h:72
DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free)
#define CPG_MEMORY_MAP_UMASK
Definition: lib/cpg.c:81
#define MAX_RETRIES
Definition: lib/cpg.c:76
#define IPC_REQUEST_SIZE
Definition: lib/util.h:49
cs_error_t hdb_error_to_cs(int res)
#define IPC_DISPATCH_SIZE
Definition: lib/util.h:51
coroipcs_zc_header struct
Definition: ipc_cpg.h:498
uint64_t server_address
Definition: ipc_cpg.h:500
The cpg_address struct.
Definition: cpg.h:110
uint32_t pid
Definition: cpg.h:112
uint32_t nodeid
Definition: cpg.h:111
uint32_t nodeid
Definition: lib/cpg.c:86
uint32_t assembly_buf_ptr
Definition: lib/cpg.c:89
struct qb_list_head list
Definition: lib/cpg.c:85
char * assembly_buf
Definition: lib/cpg.c:88
uint32_t pid
Definition: lib/cpg.c:87
The cpg_callbacks_t struct.
Definition: cpg.h:181
cpg_deliver_fn_t cpg_deliver_fn
Definition: cpg.h:182
cpg_confchg_fn_t cpg_confchg_fn
Definition: cpg.h:183
cpg_model_data_t model_data
Definition: lib/cpg.c:97
int finalize
Definition: lib/cpg.c:94
void * context
Definition: lib/cpg.c:95
uint32_t max_msg_size
Definition: lib/cpg.c:101
qb_ipcc_connection_t * c
Definition: lib/cpg.c:93
struct qb_list_head assembly_list_head
Definition: lib/cpg.c:102
struct qb_list_head iteration_list_head
Definition: lib/cpg.c:100
cpg_model_v1_data_t model_v1_data
Definition: lib/cpg.c:98
The cpg_iteration_description_t struct.
Definition: cpg.h:130
qb_ipcc_connection_t * conn
Definition: lib/cpg.c:110
struct qb_list_head list
Definition: lib/cpg.c:112
cpg_iteration_handle_t cpg_iteration_handle
Definition: lib/cpg.c:109
hdb_handle_t executive_iteration_handle
Definition: lib/cpg.c:111
struct qb_list_head list
Definition: exec/cpg.c:161
The cpg_model_data_t struct.
Definition: cpg.h:189
cpg_model_t model
Definition: cpg.h:190
The cpg_model_v1_data_t struct.
Definition: cpg.h:198
cpg_totem_confchg_fn_t cpg_totem_confchg_fn
Definition: cpg.h:202
unsigned int flags
Definition: cpg.h:203
cpg_confchg_fn_t cpg_confchg_fn
Definition: cpg.h:201
cpg_deliver_fn_t cpg_deliver_fn
Definition: cpg.h:200
The cpg_name struct.
Definition: cpg.h:120
uint32_t length
Definition: cpg.h:121
The cpg_ring_id struct.
Definition: cpg.h:139
mar_cpg_address_t struct
Definition: ipc_cpg.h:155
mar_req_coroipcc_zc_alloc_t struct
Definition: ipc_cpg.h:472
mar_req_coroipcc_zc_execute_t struct
Definition: ipc_cpg.h:490
mar_req_coroipcc_zc_free_t struct
Definition: ipc_cpg.h:481
The req_lib_cpg_finalize struct.
Definition: ipc_cpg.h:268
The req_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:457
The req_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:424
The req_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:441
The req_lib_cpg_join struct.
Definition: ipc_cpg.h:251
The req_lib_cpg_leave struct.
Definition: ipc_cpg.h:408
The req_lib_cpg_local_get struct.
Definition: ipc_cpg.h:282
The req_lib_cpg_mcast struct.
Definition: ipc_cpg.h:304
The req_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:367
The req_lib_cpg_partial_mcast struct.
Definition: ipc_cpg.h:314
The res_lib_cpg_confchg_callback struct.
Definition: ipc_cpg.h:384
mar_cpg_address_t member_list[]
Definition: ipc_cpg.h:390
Message from another node.
Definition: ipc_cpg.h:333
The res_lib_cpg_finalize struct.
Definition: ipc_cpg.h:275
The res_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:465
The res_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:433
The res_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:449
The res_lib_cpg_join struct.
Definition: ipc_cpg.h:261
The res_lib_cpg_leave struct.
Definition: ipc_cpg.h:417
The res_lib_cpg_local_get struct.
Definition: ipc_cpg.h:289
The res_lib_cpg_mcast struct.
Definition: ipc_cpg.h:326
The res_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:375
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition: ipc_cpg.h:378
The res_lib_cpg_partial_deliver_callback struct.
Definition: ipc_cpg.h:345
The res_lib_cpg_partial_send struct.
Definition: ipc_cpg.h:297
The res_lib_cpg_totem_confchg_callback struct.
Definition: ipc_cpg.h:398
int guarantee
Definition: totemsrp.c:6
struct memb_ring_id ring_id
Definition: totemsrp.c:4
struct totem_message_header header
Definition: totemsrp.c:0