corosync 3.1.7
cs_queue.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2002-2004 MontaVista Software, Inc.
3 * Copyright (c) 2006-2011 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35#ifndef CS_QUEUE_H_DEFINED
36#define CS_QUEUE_H_DEFINED
37
38#include <string.h>
39#include <stdlib.h>
40#include <pthread.h>
41#include <errno.h>
42#include "assert.h"
43
44struct cs_queue {
45 int head;
46 int tail;
47 int used;
48 int usedhw;
49 int size;
50 void *items;
53 pthread_mutex_t mutex;
55};
56
57static inline int cs_queue_init (struct cs_queue *cs_queue, int cs_queue_items, int size_per_item, int threaded_mode_enabled) {
58 cs_queue->head = 0;
59 cs_queue->tail = cs_queue_items - 1;
60 cs_queue->used = 0;
61 cs_queue->usedhw = 0;
62 cs_queue->size = cs_queue_items;
63 cs_queue->size_per_item = size_per_item;
64 cs_queue->threaded_mode_enabled = threaded_mode_enabled;
65
66 cs_queue->items = malloc (cs_queue_items * size_per_item);
67 if (cs_queue->items == 0) {
68 return (-ENOMEM);
69 }
70 memset (cs_queue->items, 0, cs_queue_items * size_per_item);
72 pthread_mutex_init (&cs_queue->mutex, NULL);
73 }
74 return (0);
75}
76
77static inline int cs_queue_reinit (struct cs_queue *cs_queue)
78{
80 pthread_mutex_lock (&cs_queue->mutex);
81 }
82 cs_queue->head = 0;
83 cs_queue->tail = cs_queue->size - 1;
84 cs_queue->used = 0;
85 cs_queue->usedhw = 0;
86
89 pthread_mutex_unlock (&cs_queue->mutex);
90 }
91 return (0);
92}
93
94static inline void cs_queue_free (struct cs_queue *cs_queue) {
96 pthread_mutex_destroy (&cs_queue->mutex);
97 }
98 free (cs_queue->items);
99}
100
101static inline int cs_queue_is_full (struct cs_queue *cs_queue) {
102 int full;
103
105 pthread_mutex_lock (&cs_queue->mutex);
106 }
107 full = ((cs_queue->size - 1) == cs_queue->used);
109 pthread_mutex_unlock (&cs_queue->mutex);
110 }
111 return (full);
112}
113
114static inline int cs_queue_is_empty (struct cs_queue *cs_queue) {
115 int empty;
116
118 pthread_mutex_lock (&cs_queue->mutex);
119 }
120 empty = (cs_queue->used == 0);
122 pthread_mutex_unlock (&cs_queue->mutex);
123 }
124 return (empty);
125}
126
127static inline void cs_queue_item_add (struct cs_queue *cs_queue, void *item)
128{
129 char *cs_queue_item;
130 int cs_queue_position;
131
133 pthread_mutex_lock (&cs_queue->mutex);
134 }
135 cs_queue_position = cs_queue->head;
136 cs_queue_item = cs_queue->items;
137 cs_queue_item += cs_queue_position * cs_queue->size_per_item;
138 memcpy (cs_queue_item, item, cs_queue->size_per_item);
139
140 assert (cs_queue->tail != cs_queue->head);
141
142 cs_queue->head = (cs_queue->head + 1) % cs_queue->size;
143 cs_queue->used++;
144 if (cs_queue->used > cs_queue->usedhw) {
146 }
148 pthread_mutex_unlock (&cs_queue->mutex);
149 }
150}
151
152static inline void *cs_queue_item_get (struct cs_queue *cs_queue)
153{
154 char *cs_queue_item;
155 int cs_queue_position;
156
158 pthread_mutex_lock (&cs_queue->mutex);
159 }
160 cs_queue_position = (cs_queue->tail + 1) % cs_queue->size;
161 cs_queue_item = cs_queue->items;
162 cs_queue_item += cs_queue_position * cs_queue->size_per_item;
164 pthread_mutex_unlock (&cs_queue->mutex);
165 }
166 return ((void *)cs_queue_item);
167}
168
169static inline void cs_queue_item_remove (struct cs_queue *cs_queue) {
171 pthread_mutex_lock (&cs_queue->mutex);
172 }
173 cs_queue->tail = (cs_queue->tail + 1) % cs_queue->size;
174
175 assert (cs_queue->tail != cs_queue->head);
176
177 cs_queue->used--;
178 assert (cs_queue->used >= 0);
180 pthread_mutex_unlock (&cs_queue->mutex);
181 }
182}
183
184static inline void cs_queue_items_remove (struct cs_queue *cs_queue, int rel_count)
185{
187 pthread_mutex_lock (&cs_queue->mutex);
188 }
189 cs_queue->tail = (cs_queue->tail + rel_count) % cs_queue->size;
190
191 assert (cs_queue->tail != cs_queue->head);
192
193 cs_queue->used -= rel_count;
195 pthread_mutex_unlock (&cs_queue->mutex);
196 }
197}
198
199
200static inline void cs_queue_item_iterator_init (struct cs_queue *cs_queue)
201{
203 pthread_mutex_lock (&cs_queue->mutex);
204 }
207 pthread_mutex_unlock (&cs_queue->mutex);
208 }
209}
210
211static inline void *cs_queue_item_iterator_get (struct cs_queue *cs_queue)
212{
213 char *cs_queue_item;
214 int cs_queue_position;
215
217 pthread_mutex_lock (&cs_queue->mutex);
218 }
219 cs_queue_position = (cs_queue->iterator) % cs_queue->size;
220 if (cs_queue->iterator == cs_queue->head) {
222 pthread_mutex_unlock (&cs_queue->mutex);
223 }
224 return (0);
225 }
226 cs_queue_item = cs_queue->items;
227 cs_queue_item += cs_queue_position * cs_queue->size_per_item;
229 pthread_mutex_unlock (&cs_queue->mutex);
230 }
231 return ((void *)cs_queue_item);
232}
233
234static inline int cs_queue_item_iterator_next (struct cs_queue *cs_queue)
235{
236 int next_res;
237
239 pthread_mutex_lock (&cs_queue->mutex);
240 }
242
243 next_res = cs_queue->iterator == cs_queue->head;
245 pthread_mutex_unlock (&cs_queue->mutex);
246 }
247 return (next_res);
248}
249
250static inline void cs_queue_avail (struct cs_queue *cs_queue, int *avail)
251{
253 pthread_mutex_lock (&cs_queue->mutex);
254 }
255 *avail = cs_queue->size - cs_queue->used - 2;
256 assert (*avail >= 0);
258 pthread_mutex_unlock (&cs_queue->mutex);
259 }
260}
261
262static inline int cs_queue_used (struct cs_queue *cs_queue) {
263 int used;
264
266 pthread_mutex_lock (&cs_queue->mutex);
267 }
268 used = cs_queue->used;
270 pthread_mutex_unlock (&cs_queue->mutex);
271 }
272
273 return (used);
274}
275
276static inline int cs_queue_usedhw (struct cs_queue *cs_queue) {
277 int usedhw;
278
280 pthread_mutex_lock (&cs_queue->mutex);
281 }
282
283 usedhw = cs_queue->usedhw;
284
286 pthread_mutex_unlock (&cs_queue->mutex);
287 }
288
289 return (usedhw);
290}
291
292#endif /* CS_QUEUE_H_DEFINED */
int used
Definition: cs_queue.h:47
int size
Definition: cs_queue.h:49
int threaded_mode_enabled
Definition: cs_queue.h:54
void * items
Definition: cs_queue.h:50
int usedhw
Definition: cs_queue.h:48
int head
Definition: cs_queue.h:45
int iterator
Definition: cs_queue.h:52
int size_per_item
Definition: cs_queue.h:51
pthread_mutex_t mutex
Definition: cs_queue.h:53
int tail
Definition: cs_queue.h:46