48#include <sys/socket.h>
69#define MAP_ANONYMOUS MAP_ANON
76#define MAX_RETRIES 100
81#define CPG_MEMORY_MAP_UMASK 077
93 qb_ipcc_connection_t *
c;
104static void cpg_inst_free (
void *inst);
123coroipcc_msg_send_reply_receive (
124 qb_ipcc_connection_t *c,
125 const struct iovec *iov,
126 unsigned int iov_len,
130 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
140static void cpg_inst_free (
void *inst)
148 struct qb_list_head *iter, *tmp_iter;
159 hdb_handle_destroy (&cpg_handle_t_db, handle);
196 goto error_no_destroy;
200 if (error !=
CS_OK) {
201 goto error_no_destroy;
205 if (error !=
CS_OK) {
212 goto error_put_destroy;
237 hdb_handle_put (&cpg_handle_t_db, *handle);
242 hdb_handle_put (&cpg_handle_t_db, *handle);
244 hdb_handle_destroy (&cpg_handle_t_db, *handle);
259 if (error !=
CS_OK) {
267 hdb_handle_put (&cpg_handle_t_db, handle);
282 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c,
288 cpg_inst_finalize (
cpg_inst, handle);
289 hdb_handle_put (&cpg_handle_t_db, handle);
302 if (error !=
CS_OK) {
308 hdb_handle_put (&cpg_handle_t_db, handle);
321 if (error !=
CS_OK) {
327 hdb_handle_put (&cpg_handle_t_db, handle);
340 if (error !=
CS_OK) {
346 hdb_handle_put (&cpg_handle_t_db, handle);
359 if (error !=
CS_OK) {
365 hdb_handle_put (&cpg_handle_t_db, handle);
383 struct qb_ipc_response_header *dispatch_data;
389 struct qb_list_head *iter, *tmp_iter;
399 if (error !=
CS_OK) {
411 dispatch_data = (
struct qb_ipc_response_header *)dispatch_buf;
413 errno_res = qb_ipcc_event_recv (
437 if (error !=
CS_OK) {
452 switch (dispatch_data->id) {
460 marshall_from_mar_cpg_name_t (
462 &res_cpg_deliver_callback->group_name);
466 res_cpg_deliver_callback->nodeid,
467 res_cpg_deliver_callback->pid,
468 &res_cpg_deliver_callback->message,
469 res_cpg_deliver_callback->msglen);
475 marshall_from_mar_cpg_name_t (
477 &res_cpg_partial_deliver_callback->group_name);
482 assembly_data = NULL;
485 if (current_assembly_data->
nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->
pid == res_cpg_partial_deliver_callback->pid) {
486 assembly_data = current_assembly_data;
499 qb_list_del (&assembly_data->
list);
502 assembly_data = NULL;
506 if (!assembly_data) {
511 assembly_data->
nodeid = res_cpg_partial_deliver_callback->nodeid;
512 assembly_data->
pid = res_cpg_partial_deliver_callback->pid;
513 assembly_data->
assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
520 qb_list_init (&assembly_data->
list);
526 res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
527 assembly_data->
assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
532 res_cpg_partial_deliver_callback->nodeid,
533 res_cpg_partial_deliver_callback->pid,
535 res_cpg_partial_deliver_callback->msglen);
537 qb_list_del (&assembly_data->
list);
551 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
555 left_list_start = res_cpg_confchg_callback->
member_list +
556 res_cpg_confchg_callback->member_list_entries;
557 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
558 marshall_from_mar_cpg_address_t (&left_list[i],
559 &left_list_start[i]);
561 joined_list_start = res_cpg_confchg_callback->
member_list +
562 res_cpg_confchg_callback->member_list_entries +
563 res_cpg_confchg_callback->left_list_entries;
564 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
565 marshall_from_mar_cpg_address_t (&joined_list[i],
566 &joined_list_start[i]);
568 marshall_from_mar_cpg_name_t (
570 &res_cpg_confchg_callback->group_name);
575 res_cpg_confchg_callback->member_list_entries,
577 res_cpg_confchg_callback->left_list_entries,
579 res_cpg_confchg_callback->joined_list_entries);
584 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
587 if (current_assembly_data->
nodeid != left_list[i].
nodeid || current_assembly_data->
pid != left_list[i].
pid)
590 qb_list_del (¤t_assembly_data->
list);
592 free(current_assembly_data);
604 marshall_from_mar_cpg_ring_id_t (&
ring_id, &res_cpg_totem_confchg_callback->ring_id);
605 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
606 totem_member_list[i] = res_cpg_totem_confchg_callback->
member_list[i];
611 res_cpg_totem_confchg_callback->member_list_entries,
640 hdb_handle_put (&cpg_handle_t_db, handle);
659 if (error !=
CS_OK) {
682 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 1,
685 if (error !=
CS_OK) {
690 error = response.header.error;
693 hdb_handle_put (&cpg_handle_t_db, handle);
713 if (error !=
CS_OK) {
727 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 1,
730 if (error !=
CS_OK) {
738 hdb_handle_put (&cpg_handle_t_db, handle);
747 int *member_list_entries)
762 if (member_list_entries == NULL) {
767 if (error !=
CS_OK) {
780 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, &iov, 1,
783 if (error !=
CS_OK) {
795 marshall_from_mar_cpg_address_t (&member_list[i],
801 hdb_handle_put (&cpg_handle_t_db, handle);
808 unsigned int *local_nodeid)
817 if (error !=
CS_OK) {
827 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, &iov, 1,
830 if (error !=
CS_OK) {
839 hdb_handle_put (&cpg_handle_t_db, handle);
852 if (error !=
CS_OK) {
858 hdb_handle_put (&cpg_handle_t_db, handle);
864memory_map (
char *path,
const char *file,
void **buf,
size_t bytes)
873 long int sysconf_page_size;
876 snprintf (path, PATH_MAX,
"/dev/shm/%s", file);
880 (void)umask(old_umask);
885 (void)umask(old_umask);
891 res = ftruncate (fd, bytes);
893 goto error_close_unlink;
895 sysconf_page_size = sysconf(_SC_PAGESIZE);
896 if (sysconf_page_size <= 0) {
897 goto error_close_unlink;
899 page_size = sysconf_page_size;
900 buffer = malloc (page_size);
901 if (buffer == NULL) {
902 goto error_close_unlink;
904 memset (buffer, 0, page_size);
905 for (i = 0; i < (bytes / page_size); i++) {
907 written = write (fd, buffer, page_size);
908 if (written == -1 && errno == EINTR) {
911 if (written != page_size) {
913 goto error_close_unlink;
918 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
921 if (
addr == MAP_FAILED) {
922 goto error_close_unlink;
925 madvise(
addr, bytes, MADV_NOSYNC);
952 struct qb_ipc_response_header res_coroipcs_zc_alloc;
960 if (error !=
CS_OK) {
965 assert(memory_map (path,
"corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
969 munmap (buf, map_size);
975 req_coroipcc_zc_alloc.map_size = map_size;
976 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
978 iovec.iov_base = (
void *)&req_coroipcc_zc_alloc;
981 error = coroipcc_msg_send_reply_receive (
985 &res_coroipcs_zc_alloc,
986 sizeof (
struct qb_ipc_response_header));
988 if (error !=
CS_OK) {
997 hdb_handle_put (&cpg_handle_t_db, handle);
1009 struct qb_ipc_response_header res_coroipcs_zc_free;
1014 if (error !=
CS_OK) {
1020 req_coroipcc_zc_free.map_size =
header->map_size;
1021 req_coroipcc_zc_free.server_address =
header->server_address;
1023 iovec.iov_base = (
void *)&req_coroipcc_zc_free;
1026 error = coroipcc_msg_send_reply_receive (
1030 &res_coroipcs_zc_free,
1031 sizeof (
struct qb_ipc_response_header));
1033 if (error !=
CS_OK) {
1045 hdb_handle_put (&cpg_handle_t_db, handle);
1065 if (error !=
CS_OK) {
1088 iovec.iov_base = (
void *)&req_coroipcc_zc_execute;
1091 error = coroipcc_msg_send_reply_receive (
1098 if (error !=
CS_OK) {
1105 hdb_handle_put (&cpg_handle_t_db, handle);
1114 const struct iovec *iovec,
1115 unsigned int iov_len)
1119 struct iovec iov[2];
1123 size_t iov_sent = 0;
1135 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 2);
1137 while (error ==
CS_OK && sent < msg_len) {
1144 iov[1].iov_len = iovec[i].iov_len - iov_sent;
1150 else if ((sent + iov[1].iov_len) == msg_len) {
1159 iov[1].iov_base = (
char *)iovec[i].iov_base + iov_sent;
1162 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 2,
1167 fprintf(stderr,
"sleep. counter=%d\n", retry_count);
1175 iov_sent += iov[1].iov_len;
1176 sent += iov[1].iov_len;
1179 if (iov_sent >= iovec[i].iov_len) {
1186 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 1);
1195 const struct iovec *iovec,
1196 unsigned int iov_len)
1201 struct iovec iov[64];
1206 if (error !=
CS_OK) {
1210 for (i = 0; i < iov_len; i++ ) {
1211 msg_len += iovec[i].iov_len;
1228 memcpy (&iov[1], iovec, iov_len *
sizeof (
struct iovec));
1230 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 2);
1232 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 1);
1235 hdb_handle_put (&cpg_handle_t_db, handle);
1256 if (cpg_iteration_handle == NULL) {
1272 if (error !=
CS_OK) {
1276 error =
hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1278 if (error !=
CS_OK) {
1279 goto error_put_cpg_db;
1282 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1284 if (error !=
CS_OK) {
1302 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c,
1308 if (error !=
CS_OK) {
1309 goto error_put_destroy;
1318 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1319 hdb_handle_put (&cpg_handle_t_db, handle);
1324 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1326 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1328 hdb_handle_put (&cpg_handle_t_db, handle);
1342 if (description == NULL) {
1346 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1348 if (error !=
CS_OK) {
1359 if (error !=
CS_OK) {
1366 if (error !=
CS_OK) {
1370 marshall_from_mar_cpg_iteration_description_t(
1377 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1392 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1394 if (error !=
CS_OK) {
1411 if (error !=
CS_OK) {
1421 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
unsigned char addr[TOTEMIP_ADDRLEN]
cs_dispatch_flags_t
The cs_dispatch_flags_t enum.
@ CS_DISPATCH_ONE_NONBLOCKING
cs_error_t qb_to_cs_error(int result)
qb_to_cs_error
cs_error_t
The cs_error_t enum.
#define CS_IPC_TIMEOUT_MS
cs_error_t cpg_flow_control_state_get(cpg_handle_t handle, cpg_flow_control_state_t *flow_control_state)
cpg_flow_control_state_get
cs_error_t cpg_iteration_next(cpg_iteration_handle_t handle, struct cpg_iteration_description_t *description)
cpg_iteration_next
cs_error_t cpg_model_initialize(cpg_handle_t *handle, cpg_model_t model, cpg_model_data_t *model_data, void *context)
Create a new cpg connection, initialize with model.
cs_error_t cpg_max_atomic_msgsize_get(cpg_handle_t handle, uint32_t *size)
Get maximum size of a message that will not be fragmented.
cs_error_t cpg_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, const struct iovec *iovec, unsigned int iov_len)
Multicast to groups joined with cpg_join.
cs_error_t cpg_leave(cpg_handle_t handle, const struct cpg_name *group)
Leave one or more groups.
cs_error_t cpg_context_get(cpg_handle_t handle, void **context)
Get contexts for a CPG handle.
cs_error_t cpg_fd_get(cpg_handle_t handle, int *fd)
Get a file descriptor on which to poll.
cs_error_t cpg_join(cpg_handle_t handle, const struct cpg_name *group)
Join one or more groups.
cs_error_t cpg_zcb_free(cpg_handle_t handle, void *buffer)
cpg_zcb_free
cs_error_t cpg_iteration_finalize(cpg_iteration_handle_t handle)
cpg_iteration_finalize
cs_error_t cpg_zcb_alloc(cpg_handle_t handle, size_t size, void **buffer)
cpg_zcb_alloc
cs_error_t cpg_membership_get(cpg_handle_t handle, struct cpg_name *group_name, struct cpg_address *member_list, int *member_list_entries)
Get membership information from cpg.
cs_error_t cpg_initialize(cpg_handle_t *handle, cpg_callbacks_t *callbacks)
Create a new cpg connection.
cs_error_t cpg_local_get(cpg_handle_t handle, unsigned int *local_nodeid)
cpg_local_get
cs_error_t cpg_iteration_initialize(cpg_handle_t handle, cpg_iteration_type_t iteration_type, const struct cpg_name *group, cpg_iteration_handle_t *cpg_iteration_handle)
cpg_iteration_initialize
cs_error_t cpg_context_set(cpg_handle_t handle, void *context)
Set contexts for a CPG handle.
cs_error_t cpg_finalize(cpg_handle_t handle)
Close the cpg handle.
cs_error_t cpg_dispatch(cpg_handle_t handle, cs_dispatch_flags_t dispatch_types)
Dispatch messages and configuration changes.
cs_error_t cpg_zcb_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, void *msg, size_t msg_len)
cpg_zcb_mcast_joined
cpg_flow_control_state_t
The cpg_flow_control_state_t enum.
#define CPG_MAX_NAME_LENGTH
cpg_guarantee_t
The cpg_guarantee_t enum.
cpg_iteration_type_t
The cpg_iteration_type_t enum.
uint64_t cpg_handle_t
cpg_handle_t
uint64_t cpg_iteration_handle_t
cpg_iteration_handle_t
cpg_model_t
The cpg_model_t enum.
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
@ CPG_FLOW_CONTROL_DISABLED
flow control is disabled - new messages may be sent
@ CPG_ITERATION_ONE_GROUP
@ CPG_ITERATION_NAME_ONLY
@ LIBCPG_PARTIAL_CONTINUED
@ MESSAGE_REQ_CPG_ZC_EXECUTE
@ MESSAGE_REQ_CPG_LOCAL_GET
@ MESSAGE_REQ_CPG_ITERATIONFINALIZE
@ MESSAGE_REQ_CPG_PARTIAL_MCAST
@ MESSAGE_REQ_CPG_ZC_ALLOC
@ MESSAGE_REQ_CPG_ZC_FREE
@ MESSAGE_REQ_CPG_ITERATIONINITIALIZE
@ MESSAGE_REQ_CPG_FINALIZE
@ MESSAGE_REQ_CPG_MEMBERSHIP
@ MESSAGE_REQ_CPG_ITERATIONNEXT
@ MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK
@ MESSAGE_RES_CPG_DELIVER_CALLBACK
@ MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK
@ MESSAGE_RES_CPG_CONFCHG_CALLBACK
DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free)
#define CPG_MEMORY_MAP_UMASK
cs_error_t hdb_error_to_cs(int res)
#define IPC_DISPATCH_SIZE
uint32_t assembly_buf_ptr
The cpg_callbacks_t struct.
cpg_deliver_fn_t cpg_deliver_fn
cpg_confchg_fn_t cpg_confchg_fn
cpg_model_data_t model_data
struct qb_list_head assembly_list_head
struct qb_list_head iteration_list_head
cpg_model_v1_data_t model_v1_data
The cpg_iteration_description_t struct.
qb_ipcc_connection_t * conn
cpg_iteration_handle_t cpg_iteration_handle
hdb_handle_t executive_iteration_handle
The cpg_model_data_t struct.
The cpg_model_v1_data_t struct.
cpg_totem_confchg_fn_t cpg_totem_confchg_fn
cpg_confchg_fn_t cpg_confchg_fn
cpg_deliver_fn_t cpg_deliver_fn
mar_req_coroipcc_zc_alloc_t struct
mar_req_coroipcc_zc_execute_t struct
mar_req_coroipcc_zc_free_t struct
The req_lib_cpg_finalize struct.
The req_lib_cpg_iterationfinalize struct.
The req_lib_cpg_iterationinitialize struct.
The req_lib_cpg_iterationnext struct.
The req_lib_cpg_join struct.
The req_lib_cpg_leave struct.
The req_lib_cpg_local_get struct.
The req_lib_cpg_mcast struct.
The req_lib_cpg_membership_get struct.
The req_lib_cpg_partial_mcast struct.
The res_lib_cpg_confchg_callback struct.
mar_cpg_address_t member_list[]
Message from another node.
The res_lib_cpg_finalize struct.
The res_lib_cpg_iterationfinalize struct.
The res_lib_cpg_iterationinitialize struct.
The res_lib_cpg_iterationnext struct.
The res_lib_cpg_join struct.
The res_lib_cpg_leave struct.
The res_lib_cpg_local_get struct.
The res_lib_cpg_mcast struct.
The res_lib_cpg_membership_get struct.
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
The res_lib_cpg_partial_deliver_callback struct.
The res_lib_cpg_partial_send struct.
The res_lib_cpg_totem_confchg_callback struct.
mar_uint32_t member_list[]
struct memb_ring_id ring_id
struct totem_message_header header