69static int32_t ipc_not_enough_fds_left = 0;
70static int32_t ipc_fc_is_quorate;
71static int32_t ipc_fc_totem_queue_level;
72static int32_t ipc_fc_sync_in_process;
73static int32_t ipc_allow_connections = 0;
75#define CS_IPCS_MAPPER_SERV_NAME 256
91static int32_t cs_ipcs_job_add(
enum qb_loop_priority p,
void *data, qb_loop_job_dispatch_fn fn);
92static int32_t cs_ipcs_dispatch_add(
enum qb_loop_priority p, int32_t fd, int32_t events,
93 void *data, qb_ipcs_dispatch_fn_t fn);
94static int32_t cs_ipcs_dispatch_mod(
enum qb_loop_priority p, int32_t fd, int32_t events,
95 void *data, qb_ipcs_dispatch_fn_t fn);
96static int32_t cs_ipcs_dispatch_del(int32_t fd);
97static void outq_flush (
void *data);
100static struct qb_ipcs_poll_handlers corosync_poll_funcs = {
101 .job_add = cs_ipcs_job_add,
102 .dispatch_add = cs_ipcs_dispatch_add,
103 .dispatch_mod = cs_ipcs_dispatch_mod,
104 .dispatch_del = cs_ipcs_dispatch_del,
107static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid);
108static void cs_ipcs_connection_created(qb_ipcs_connection_t *c);
109static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
110 void *data,
size_t size);
111static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c);
112static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c);
114static struct qb_ipcs_service_handlers corosync_service_funcs = {
115 .connection_accept = cs_ipcs_connection_accept,
116 .connection_created = cs_ipcs_connection_created,
117 .msg_process = cs_ipcs_msg_process,
118 .connection_closed = cs_ipcs_connection_closed,
119 .connection_destroyed = cs_ipcs_connection_destroyed,
124static const char* cs_ipcs_serv_short_name(int32_t service_id)
127 switch (service_id) {
161 ipc_allow_connections = allow;
166 if (ipcs_mapper[service_id].inst) {
167 qb_ipcs_destroy(ipcs_mapper[service_id].inst);
168 ipcs_mapper[service_id].
inst = NULL;
173static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid)
175 int32_t service = qb_ipcs_service_id_get(c);
179 if (!ipc_allow_connections) {
185 ipcs_mapper[service].inst == NULL) {
189 if (ipc_not_enough_fds_left) {
193 if (euid == 0 || egid == 0) {
218static char * pid_to_name (pid_t pid,
char *out_name,
size_t name_len)
226 snprintf (fname, 32,
"/proc/%d/stat", pid);
227 fp = fopen (fname,
"r");
232 if (fgets (buf,
sizeof (buf), fp) == NULL) {
238 name = strrchr (buf,
'(');
246 rest = strrchr (buf,
')');
248 if (rest == NULL || rest[1] !=
' ') {
257 strncpy (out_name, name, name_len - 1);
258 out_name[name_len - 1] =
'\0';
262static void cs_ipcs_connection_created(qb_ipcs_connection_t *c)
266 struct qb_ipcs_connection_stats stats;
271 service = qb_ipcs_service_id_get(c);
274 context = calloc(1, size);
275 if (context == NULL) {
276 qb_ipcs_disconnect(c);
285 qb_ipcs_context_set(c, context);
288 log_printf(LOG_ERR,
"lib_init_fn failed, disconnecting");
289 qb_ipcs_disconnect(c);
293 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
295 if (!pid_to_name (stats.client_pid, context->
proc_name,
sizeof(context->
proc_name))) {
304 qb_ipcs_connection_ref(conn);
309 qb_ipcs_connection_unref(conn);
315 cnx = qb_ipcs_context_get(conn);
316 return &cnx->
data[0];
319static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c)
322 struct qb_list_head *list, *tmp_iter;
327 context = qb_ipcs_context_get(c);
329 qb_list_for_each_safe(
list, tmp_iter, &(context->
outq_head)) {
340static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c)
343 int32_t service = qb_ipcs_service_id_get(c);
344 struct qb_ipcs_connection_stats stats;
354 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
364 const struct iovec *iov,
365 unsigned int iov_len)
367 int32_t rc = qb_ipcs_response_sendv(conn, iov, iov_len);
376 int32_t rc = qb_ipcs_response_send(conn, msg, mlen);
383static void outq_flush (
void *data)
385 qb_ipcs_connection_t *conn = data;
386 struct qb_list_head *list, *tmp_iter;
391 qb_list_for_each_safe(list, tmp_iter, &(context->
outq_head)) {
395 if (rc < 0 && rc != -EAGAIN) {
397 qb_perror(LOG_ERR,
"qb_ipcs_event_send");
399 }
else if (rc == -EAGAIN) {
410 if (qb_list_empty (&context->
outq_head)) {
421static void msg_send_or_queue(qb_ipcs_connection_t *conn,
const struct iovec *iov, uint32_t iov_len)
425 int32_t bytes_msg = 0;
430 for (i = 0; i < iov_len; i++) {
431 bytes_msg += iov[i].iov_len;
435 assert(qb_list_empty (&context->
outq_head));
436 rc = qb_ipcs_event_sendv(conn, iov, iov_len);
437 if (rc == bytes_msg) {
453 qb_ipcs_disconnect(conn);
459 qb_ipcs_disconnect(conn);
464 for (i = 0; i < iov_len; i++) {
465 memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
466 write_buf += iov[i].iov_len;
477 iov.iov_base = (
void *)msg;
479 msg_send_or_queue (conn, &iov, 1);
484 const struct iovec *iov,
485 unsigned int iov_len)
487 msg_send_or_queue(conn, iov, iov_len);
491static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
492 void *data,
size_t size)
494 struct qb_ipc_response_header response;
495 struct qb_ipc_request_header *request_pt = (
struct qb_ipc_request_header *)data;
496 int32_t service = qb_ipcs_service_id_get(c);
498 int32_t is_async_call = QB_FALSE;
500 int sending_allowed_private_data;
506 &sending_allowed_private_data);
508 is_async_call = (service ==
CPG_SERVICE && request_pt->id == 2);
514 if (send_ok == -EINVAL) {
515 response.size =
sizeof (response);
519 cnx = qb_ipcs_context_get(c);
526 __func__, response.size, response.error);
528 qb_ipcs_response_send (c,
533 }
else if (send_ok < 0) {
534 cnx = qb_ipcs_context_get(c);
538 if (!is_async_call) {
542 response.size =
sizeof (response);
545 qb_ipcs_response_send (c,
550 "*** %s() (%d:%d - %d) %s!",
551 __func__, service, request_pt->id,
552 is_async_call, strerror(-send_ok));
566static int32_t cs_ipcs_job_add(
enum qb_loop_priority p,
void *
data, qb_loop_job_dispatch_fn fn)
571static int32_t cs_ipcs_dispatch_add(
enum qb_loop_priority p, int32_t fd, int32_t events,
572 void *
data, qb_ipcs_dispatch_fn_t fn)
577static int32_t cs_ipcs_dispatch_mod(
enum qb_loop_priority p, int32_t fd, int32_t events,
578 void *
data, qb_ipcs_dispatch_fn_t fn)
583static int32_t cs_ipcs_dispatch_del(int32_t fd)
588static void cs_ipcs_low_fds_event(int32_t not_enough, int32_t fds_available)
590 ipc_not_enough_fds_left = not_enough;
603 return ipc_fc_totem_queue_level;
606static qb_loop_timer_handle ipcs_check_for_flow_control_timer;
607static void cs_ipcs_check_for_flow_control(
void)
616 fc_enabled = QB_IPCS_RATE_OFF;
617 if (ipc_fc_is_quorate == 1 ||
624 ipc_fc_sync_in_process == 0) {
625 fc_enabled = QB_FALSE;
632 fc_enabled = QB_FALSE;
634 fc_enabled = QB_IPCS_RATE_OFF_2;
638 qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, fc_enabled);
643 qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_FAST);
645 qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_NORMAL);
647 qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_SLOW);
652static void cs_ipcs_fc_quorum_changed(
int quorate,
void *context)
655 cs_ipcs_check_for_flow_control();
658static void cs_ipcs_totem_queue_level_changed(
enum totem_q_level level)
660 ipc_fc_totem_queue_level = level;
661 cs_ipcs_check_for_flow_control();
666 ipc_fc_sync_in_process = sync_in_process;
667 cs_ipcs_check_for_flow_control();
672 memcpy(ipcs_stats, &global_stats,
sizeof(global_stats));
678 qb_ipcs_connection_t *c, *prev;
681 if (
corosync_service[service_id] == NULL || ipcs_mapper[service_id].inst == NULL) {
685 qb_ipcs_stats_get(ipcs_mapper[service_id].inst, &ipcs_stats->
srv, QB_FALSE);
687 for (c = qb_ipcs_connection_first_get(ipcs_mapper[service_id].inst);
689 prev = c, c = qb_ipcs_connection_next_get(ipcs_mapper[service_id].inst, prev), qb_ipcs_connection_unref(prev)) {
691 cnx = qb_ipcs_context_get(c);
692 if (cnx == NULL)
continue;
693 if (c != conn_ptr)
continue;
695 qb_ipcs_connection_stats_get(c, &ipcs_stats->
conn, QB_FALSE);
696 if (ipcs_stats->
conn.client_pid != pid) {
713 qb_ipcs_connection_t *c, *prev;
717 memset(&global_stats, 0,
sizeof(global_stats));
720 if (!ipcs_mapper[service_id].inst) {
724 for (c = qb_ipcs_connection_first_get(ipcs_mapper[service_id].inst);
726 prev = c, c = qb_ipcs_connection_next_get(ipcs_mapper[service_id].inst, prev), qb_ipcs_connection_unref(prev)) {
728 qb_ipcs_connection_stats_get(c, &ipcs_stats.
conn, QB_TRUE);
731 cnx = qb_ipcs_context_get(c);
732 if (
cnx == NULL)
continue;
741static enum qb_ipc_type cs_get_ipc_type (
void)
745 enum qb_ipc_type ret = QB_IPC_NATIVE;
749 return QB_IPC_NATIVE;
752 if (strcmp(str,
"native") == 0) {
757 if (strcmp(str,
"shm") == 0) {
762 if (strcmp(str,
"socket") == 0) {
780 const char *serv_short_name;
782 serv_short_name = cs_ipcs_serv_short_name(service->
id);
786 "NOT Initializing IPC on %s [%d]",
794 return "qb_ipcs_run error";
797 ipcs_mapper[service->
id].
id = service->
id;
798 strcpy(ipcs_mapper[service->
id].
name, serv_short_name);
800 "Initializing IPC on %s [%d]",
801 ipcs_mapper[service->
id].
name,
802 ipcs_mapper[service->
id].
id);
803 ipcs_mapper[service->
id].
inst = qb_ipcs_create(ipcs_mapper[service->
id].
name,
804 ipcs_mapper[service->
id].
id,
806 &corosync_service_funcs);
807 assert(ipcs_mapper[service->
id].
inst);
808 qb_ipcs_poll_handlers_set(ipcs_mapper[service->
id].
inst,
809 &corosync_poll_funcs);
810 if (qb_ipcs_run(ipcs_mapper[service->
id].
inst) != 0) {
812 return "qb_ipcs_run error";
struct corosync_api_v1 * apidef_get(void)
#define SERVICES_COUNT_MAX
cs_error_t
The cs_error_t enum.
cs_error_t icmap_get_uint8(const char *key_name, uint8_t *u8)
#define ICMAP_KEYNAME_MAXLEN
Maximum length of key in icmap.
cs_error_t icmap_get_string(const char *key_name, char **str)
Shortcut for icmap_get for string type.
void * cs_ipcs_private_data_get(void *conn)
void cs_ipcs_sync_state_changed(int32_t sync_in_process)
int32_t cs_ipcs_service_destroy(int32_t service_id)
#define CS_IPCS_MAPPER_SERV_NAME
void cs_ipc_refcnt_inc(void *conn)
cs_error_t cs_ipcs_get_conn_stats(int service_id, uint32_t pid, void *conn_ptr, struct ipcs_conn_stats *ipcs_stats)
void cs_ipc_refcnt_dec(void *conn)
const char * cs_ipcs_service_init(struct corosync_service_engine *service)
int cs_ipcs_dispatch_iov_send(void *conn, const struct iovec *iov, unsigned int iov_len)
void cs_ipcs_get_global_stats(struct ipcs_global_stats *ipcs_stats)
LOGSYS_DECLARE_SUBSYS("MAIN")
int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen)
int cs_ipcs_response_iov_send(void *conn, const struct iovec *iov, unsigned int iov_len)
int32_t cs_ipcs_q_level_get(void)
void cs_ipcs_clear_stats()
void cs_ipc_allow_connections(int32_t allow)
int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
#define LOGSYS_LEVEL_ERROR
#define log_printf(level, format, args...)
#define LOGSYS_LEVEL_INFO
#define LOGSYS_LEVEL_NOTICE
#define LOGSYS_LEVEL_WARNING
#define LOGSYS_LEVEL_DEBUG
void corosync_sending_allowed_release(void *sending_allowed_private_data)
qb_loop_t * cs_poll_handle_get(void)
void corosync_recheck_the_q_level(void *data)
int corosync_sending_allowed(unsigned int service, unsigned int id, const void *msg, void *sending_allowed_private_data)
struct corosync_service_engine * corosync_service[SERVICES_COUNT_MAX]
void stats_ipcs_add_connection(int service_id, uint32_t pid, void *ptr)
void stats_ipcs_del_connection(int service_id, uint32_t pid, void *ptr)
The corosync_api_v1 struct.
int(* quorum_register_callback)(quorum_callback_fn_t callback_fn, void *context)
void(* lib_handler_fn)(void *conn, const void *msg)
The corosync_service_engine struct.
int(* lib_exit_fn)(void *conn)
struct corosync_lib_handler * lib_engine
struct qb_list_head outq_head
char name[CS_IPCS_MAPPER_SERV_NAME]
struct qb_ipcs_connection_stats conn
struct cs_ipcs_conn_context cnx
Totem Single Ring Protocol.
void totempg_queue_level_register_callback(totem_queue_level_changed_fn)