Libthreadar 1.4.0
Loading...
Searching...
No Matches
tampon.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2020 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_TAMPON_H
25#define LIBTHREADAR_TAMPON_H
26
29
30#include "config.h"
31
32 // C system headers
33extern "C"
34{
35
36}
37 // C++ standard headers
38
39
40 // libthreadar headers
41#include "mutex.hpp"
42#include "exceptions.hpp"
43
44namespace libthreadar
45{
46
48
101 template <class T> class tampon
102 {
103 public:
105
109 tampon(unsigned int max_block, unsigned int block_size);
110
112 tampon(const tampon & ref) = delete;
113
115 tampon(tampon && ref) noexcept = default;
116
118 tampon & operator = (const tampon & ref) = delete;
119
121 tampon & operator = (tampon && ref) noexcept = default;
122
125 ~tampon();
126
128
133 void get_block_to_feed(T * & ptr, unsigned int & num);
134
136
140 void feed(T* ptr, unsigned int written);
141
143
147 void feed_cancel_get_block(T *ptr);
148
150
155 void fetch(T* & ptr, unsigned int & num);
156
158
161 void fetch_recycle(T* ptr);
162
164
173 void fetch_push_back(T *ptr, unsigned int new_num);
174
176
179 void fetch_push_back_and_skip(T *ptr, unsigned int new_num);
180
182 void fetch_skip_back();
183
185 bool has_readable_block_next() const;
186
188 bool is_empty() const;
189
191 bool is_not_empty() const { return !is_empty(); };
192
194 bool is_full() const { return full; }; // no need to acquire mutex "modif"
195
197 bool is_not_full() const { return !is_full(); };
198
200 bool is_quiet_full() const { unsigned int tmp = next_feed; shift_by_one(tmp); return tmp == fetch_head; };
201
203
205 unsigned int size() const { return table_size; };
206
208
210 unsigned int block_size() const { return alloc_size; };
211
213 unsigned int load() const { return fetch_head <= next_feed ? next_feed - fetch_head : table_size - (fetch_head - next_feed); };
214
216 void reset();
217
218 private:
219
220 struct atom
221 {
222 T* mem;
223 unsigned int data_size;
224
225 atom() { mem = nullptr; data_size = 0; };
226 };
227
228 mutex modif; //< to make critical section when non atomic action requires a status has not changed between a test and following action
229 atom *table; //< datastructure holding data in transit between two threads
230 unsigned int table_size; //< size of table, i.e. number of struct atom it holds
231 unsigned int alloc_size; //< size of allocated memory for each atom in table
232 unsigned int next_feed; //< index in table of the next atom to use for feeding table
233 unsigned int next_fetch; //< index in table of the next atom to use for fetch table
234 unsigned int fetch_head; //< the oldest object to be fetched
235 bool fetch_outside; //< if set to true, table's index pointed to by next_fetch is used by the fetcher
236 bool feed_outside; //< if set to true, table's index pointed to by next_feed is used by the feeder
237 mutex waiting_feeder; //< feeder thread may be stuck waiting on that semaphore if table is full
238 mutex waiting_fetcher; //< fetcher thread may be stuck waiting on that semaphore if table is empty
239 bool full; //< set when tampon is full
240 bool feeder_go_lock; //< true to inform fetcher than feeder is about to or has already acquire lock on waiting_feeder
241 bool feeder_lock_track; //< only used by feeder to lock on waiting_feeder once outside of critical section
242 bool fetcher_go_lock; //< true to inform feeder than fetcher is about to or has already acquire lock on waiting_fetcher
243 bool fetcher_lock_track; //< only used by fetcher to lock on waiting_fetcher once outside of critical section
244
245 bool is_empty_no_lock() const { return next_feed == fetch_head && !full; };
246
248 bool has_readable_block_next_no_lock() const { return next_feed != next_fetch || full; }
249
251 void shift_by_one(unsigned int & x) const;
252
254 void shift_back_by_one(unsigned int & x) const;
255
260 void shift_by_one_data_in_range(unsigned int begin, unsigned int end);
261
262 };
263
264 template <class T> tampon<T>::tampon(unsigned int max_block, unsigned int block_size)
265 {
266 table_size = max_block;
267 table = new atom[table_size];
268 if(table == nullptr)
269 throw exception_memory();
270 try
271 {
272 alloc_size = block_size;
273 try
274 {
275 for(unsigned int i = 0 ; i < table_size ; ++i)
276 {
277 table[i].mem = new T[alloc_size];
278 if(table[i].mem == nullptr)
279 throw exception_memory();
280 table[i].data_size = 0;
281 }
282 reset();
283 }
284 catch(...)
285 {
286 for(unsigned int i = 0; i < table_size ; ++i)
287 {
288 if(table[i].mem != nullptr)
289 delete [] table[i].mem;
290 }
291
292 throw;
293 }
294 }
295 catch(...)
296 {
297 if(table != nullptr)
298 delete [] table;
299 throw;
300 }
301 }
302
303
304 template <class T> tampon<T>::~tampon()
305 {
306 if(table != nullptr)
307 {
308 for(unsigned int i = 0 ; i < table_size ; ++i)
309 {
310 if(table[i].mem != nullptr)
311 delete [] table[i].mem;
312 }
313 delete [] table;
314 }
315 }
316
317 template <class T> void tampon<T>::get_block_to_feed(T * & ptr, unsigned int & num)
318 {
319 if(feed_outside)
320 throw exception_range("feed already out!");
321
322 modif.lock(); // --- critical section START
323 if(is_full())
324 {
325 feeder_go_lock = true; // inform fetcher that we will suspend in waiting_feeder
326 feeder_lock_track = true;// to suspend on waiting_feeder once we will be out of the critical section
327 }
328 modif.unlock(); // --- critical section END
329
330 if(feeder_lock_track)
331 {
332 feeder_lock_track = false;
333 waiting_feeder.lock(); // cannot lock inside a critical section ...
334 }
335
336 if(is_full())
337 throw THREADAR_BUG; // still full!
338
339 feed_outside = true;
340 ptr = table[next_feed].mem;
341 num = alloc_size;
342 }
343
344 template <class T> void tampon<T>::feed(T *ptr, unsigned int num)
345 {
346 if(!feed_outside)
347 throw exception_range("fetch not outside!");
348 feed_outside = false;
349
350 if(ptr != table[next_feed].mem)
351 throw exception_range("returned ptr is not the one given earlier for feeding");
352 table[next_feed].data_size = num;
353
354 modif.lock(); // --- critical section START
355 shift_by_one(next_feed);
356 if(next_feed == fetch_head)
357 full = true;
358 if(fetcher_go_lock)
359 {
360 fetcher_go_lock = false;
361 waiting_fetcher.unlock();
362 }
363 modif.unlock(); // --- critical section END
364 }
365
366 template <class T> void tampon<T>::feed_cancel_get_block(T *ptr)
367 {
368 if(!feed_outside)
369 throw exception_range("feed not outside!");
370 feed_outside = false;
371 if(ptr != table[next_feed].mem)
372 throw exception_range("returned ptr is not the one given earlier for feeding");
373 }
374
375 template <class T> void tampon<T>::fetch(T* & ptr, unsigned int & num)
376 {
377 if(fetch_outside)
378 throw exception_range("already fetched block outside");
379
380 modif.lock(); // --- critical section START
381 if(!has_readable_block_next_no_lock())
382 {
383 fetcher_go_lock = true; // to inform feeder that we will suspend on waiting_fetcher
384 fetcher_lock_track = true; // to suspend on waiting_fetcher once we will be out of the critical section
385 }
386 modif.unlock(); // --- critical section END
387
388 if(fetcher_lock_track)
389 {
390 fetcher_lock_track = false;
391 waiting_fetcher.lock(); // cannot lock inside a critical section ...
392 }
393
394 if(is_empty())
395 throw THREADAR_BUG;
396
397 fetch_outside = true;
398 ptr = table[next_fetch].mem;
399 num = table[next_fetch].data_size;
400 }
401
402 template <class T> void tampon<T>::fetch_recycle(T* ptr)
403 {
404 if(!fetch_outside)
405 throw exception_range("no block outside for fetching");
406 fetch_outside = false;
407 if(ptr != table[next_fetch].mem)
408 throw exception_range("returned ptr is no the one given earlier for fetching");
409
410 modif.lock(); // --- critical section START
411 if(next_fetch == fetch_head)
412 {
413
414 // no block were skipped
415
416 shift_by_one(fetch_head);
417 next_fetch = fetch_head;
418 full = false;
419 }
420 else
421 {
422 unsigned int tmp = next_fetch;
423 atom tmp_tom;
424
425 shift_by_one(tmp);
426 shift_by_one_data_in_range(tmp, next_feed);
427
428 // we also take into account the situation
429 // where a block has been given for feeding
430 // so the next call to feed() will match the
431 // expected address of the returned block
432 tmp = next_feed; // recording old position of next_feed
433 shift_back_by_one(next_feed);
434 // swapping contents between old next_feed position
435 // and new one:
436 tmp_tom = table[next_feed];
437 table[next_feed] = table[tmp];
438 table[tmp] = tmp_tom;
439 // done!
440
441 full = false;
442 }
443
444 if(feeder_go_lock)
445 {
446 feeder_go_lock = false;
447 waiting_feeder.unlock();
448 }
449 modif.unlock(); // --- critical section END
450 }
451
452 template <class T> void tampon<T>::fetch_push_back(T* ptr, unsigned int new_num)
453 {
454 if(!fetch_outside)
455 throw exception_range("no block outside for fetching");
456 fetch_outside = false;
457
458 if(ptr != table[next_fetch].mem)
459 throw exception_range("returned ptr is not the one given earlier for fetching");
460 table[next_fetch].data_size = new_num;
461 }
462
463 template <class T> void tampon<T>::fetch_push_back_and_skip(T *ptr,
464 unsigned int new_num)
465 {
466 fetch_push_back(ptr, new_num);
467 modif.lock(); // --- critical section START
468 if(full && next_fetch == next_feed) // reach last block feed, cannot skip it
469 throw exception_range("cannot skip the last fed block when the tampon is full");
470 shift_by_one(next_fetch);
471 modif.unlock(); // --- critical section END
472 }
473
474 template <class T> void tampon<T>::fetch_skip_back()
475 {
476 if(fetch_outside)
477 throw exception_range("cannot skip back fetching while a block is being fetched");
478 next_fetch = fetch_head;
479 }
480
481
482 template <class T> bool tampon<T>::has_readable_block_next() const
483 {
484 bool ret;
485
486 tampon<T> *me = const_cast<tampon<T> *>(this);
487 if(me == nullptr)
488 throw THREADAR_BUG;
489 me->modif.lock();
490 ret = has_readable_block_next_no_lock();
491 me->modif.unlock();
492
493 return ret;
494 }
495
496
497 template <class T> bool tampon<T>::is_empty() const
498 {
499 bool ret;
500
501 tampon<T> * me = const_cast<tampon<T> *>(this);
502 if(me == nullptr)
503 throw THREADAR_BUG;
504 me->modif.lock();
505 ret = is_empty_no_lock();
506 me->modif.unlock();
507
508 return ret;
509 }
510
511 template <class T> void tampon<T>::reset()
512 {
513 next_feed = 0;
514 next_fetch = 0;
515 fetch_head = 0;
516 fetch_outside = false;
517 feed_outside = false;
518 full = false;
519 feeder_go_lock = false;
520 feeder_lock_track = false;
521 fetcher_go_lock = false;
522 fetcher_lock_track = false;
523 (void)waiting_feeder.try_lock();
524 (void)waiting_fetcher.try_lock();
525 }
526
527 template <class T> void tampon<T>::shift_by_one(unsigned int & x) const
528 {
529 ++x;
530 if(x >= table_size)
531 x = 0;
532 }
533
534 template <class T> void tampon<T>::shift_back_by_one(unsigned int & x) const
535 {
536 if(x == 0)
537 x = table_size - 1;
538 else
539 --x;
540 }
541
542 template <class T> void tampon<T>::shift_by_one_data_in_range(unsigned int begin, unsigned int end)
543 {
544
545 if(begin != end)
546 {
547 unsigned int prev = begin;
548 shift_back_by_one(prev);
549 T* not_squeezed = table[prev].mem; // we will erase the address pointed to by mem so we keep it in memory here
550
551 while(begin != end)
552 {
553 table[prev] = table[begin]; // this copies both mem (the value of the pointer, not the pointed to) and data_size,
554 prev = begin;
555 shift_by_one(begin);
556 }
557
558 table[prev].mem = not_squeezed;
559 table[prev].data_size = 0; // by precaution
560 }
561 }
562
563
564} // end of namespace
565
566#endif
567
Exception used to report memory allocation failures.
Exception used to report out or range value or argument.
void unlock()
unlock the mutex
void lock()
lock the mutex
DEPRECATED see fast_tampon instead!
Definition tampon.hpp:102
unsigned int size() const
returns the size of the tampon in maximum number of block it can contain
Definition tampon.hpp:205
bool is_full() const
for feeder to know whether the next call to get_block_to_feed() will be blocking
Definition tampon.hpp:194
void fetch_recycle(T *ptr)
fetcher call - step 2
Definition tampon.hpp:402
bool is_not_full() const
to know whether the tampon is not full
Definition tampon.hpp:197
unsigned int load() const
returns the current number of blocks currently used in the tampon (fed but not fetched)
Definition tampon.hpp:213
tampon & operator=(const tampon &ref)=delete
no assignment operator
tampon(const tampon &ref)=delete
no copy constructor
unsigned int block_size() const
returns the allocation size of each block
Definition tampon.hpp:210
void feed_cancel_get_block(T *ptr)
feeder call - step 2 alternative
Definition tampon.hpp:366
bool has_readable_block_next() const
to known whether next fetch will be blocking (not skipped blocks)
Definition tampon.hpp:482
tampon(unsigned int max_block, unsigned int block_size)
constructor
Definition tampon.hpp:264
bool is_quiet_full() const
returns true if only one slot is available before filling the tampon
Definition tampon.hpp:200
void get_block_to_feed(T *&ptr, unsigned int &num)
feeder call - step 1
Definition tampon.hpp:317
bool is_not_empty() const
to know whether the tampon is not empty
Definition tampon.hpp:191
tampon(tampon &&ref) noexcept=default
no move constructor
void fetch(T *&ptr, unsigned int &num)
fetcher call - step 1
Definition tampon.hpp:375
void feed(T *ptr, unsigned int written)
feeder call - step 2
Definition tampon.hpp:344
void fetch_push_back_and_skip(T *ptr, unsigned int new_num)
put back the fetched block and skip to next block for the next fetch()
Definition tampon.hpp:463
bool is_empty() const
to know whether the tampon has objects (readable or skipped)
Definition tampon.hpp:497
void fetch_push_back(T *ptr, unsigned int new_num)
fetcher call - step 2 alternative
Definition tampon.hpp:452
void fetch_skip_back()
reactivate all skipped blocks, next fetch() will be the oldest available block
Definition tampon.hpp:474
void reset()
reset the object fields and mutex as if the object was just created
Definition tampon.hpp:511
defines a set of exceptions that are used by libthreadar to report error situations
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition barrier.hpp:46