Libthreadar 1.4.0
Loading...
Searching...
No Matches
ratelier_gather.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2020 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_RATELIER_GATHER_HPP
25#define LIBTHREADAR_RATELIER_GATHER_HPP
26
69
70
71
72#include "config.h"
73
74 // C system headers
75extern "C"
76{
77}
78 // C++ standard headers
79#include <vector>
80#include <map>
81#include <deque>
82#include <memory>
83
84 // libthreadar headers
85#include "mutex.hpp"
86
87
88namespace libthreadar
89{
91
96
97 template <class T> class ratelier_gather
98 {
99 public:
100 ratelier_gather(unsigned int size, signed int flag = 0);
101 ratelier_gather(const ratelier_gather & ref) = delete;
102 ratelier_gather(ratelier_gather && ref) = default;
103 ratelier_gather & operator = (const ratelier_gather & ref) = delete;
104 ratelier_gather & operator = (ratelier_gather && ref) noexcept = default;
105 virtual ~ratelier_gather() = default;
106
108
116 void worker_push_one(unsigned int slot, std::unique_ptr<T> & one, signed int flag = 0);
117
119
124 void gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag);
125
127 void reset();
128
129 private:
130
131 static const unsigned int cond_pending_data = 0;
132 static const unsigned int cond_full = 1;
133
134 struct slot
135 {
136 std::unique_ptr<T> obj;
137 bool empty;
138 unsigned int index;
139 signed int flag;
140
141 slot(signed int val) { empty = true; flag = val; };
142 slot(const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
143 };
144
145 unsigned int next_index;
146 std::vector<slot> table;
147 std::map<unsigned int, unsigned int> corres;
148 std::deque<unsigned int> empty_slot;
150 };
151
152 template <class T> ratelier_gather<T>::ratelier_gather(unsigned int size, signed int flag):
153 table(size, slot(flag)),
154 verrou(2)
155 {
156 next_index = 0;
157
158 for(unsigned int i = 0; i < size; ++i)
159 empty_slot.push_back(i);
160 }
161
162 template <class T> void ratelier_gather<T>::worker_push_one(unsigned int slot, std::unique_ptr<T> & one, signed int flag)
163 {
164 verrou.lock();
165
166 try
167 {
168 while(empty_slot.empty() // no free slot available
169 || ((empty_slot.size() == 1 && slot != next_index) // one slot available and we do not provide the lowest expecting slot num
170 && corres.begin() != corres.end() && (corres.begin())->first != next_index)) // and lowest slot is still not received
171 verrou.wait(cond_full);
172
173 std::map<unsigned int, unsigned int>::iterator it = corres.find(slot);
174 unsigned int index;
175
176 if(it != corres.end())
177 throw exception_range("the ratelier_gather index to fill is already used");
178
179 index = empty_slot.back();
180
181 // sanity checks
182
183 if(index >= table.size())
184 throw THREADAR_BUG;
185 if( ! table[index].empty)
186 throw THREADAR_BUG;
187
188 // recording the change
189
190 corres[slot] = index;
191 table[index].obj = std::move(one);
192 table[index].empty = false;
193 table[index].index = slot;
194 table[index].flag = flag;
195
196 empty_slot.pop_back();
197
198 if(verrou.get_waiting_thread_count(cond_pending_data) > 0)
199 if(corres.find(next_index) != corres.end()) // some data can be gathered
200 verrou.signal(cond_pending_data); // awaking the gathering thread
201 }
202 catch(...)
203 {
204 verrou.unlock(); // unlock first, as broadcast/signal may be the cause of the exception
205 verrou.broadcast(cond_pending_data);
206 verrou.broadcast(cond_full);
207 throw;
208 }
209 verrou.unlock();
210 }
211
212 template <class T> void ratelier_gather<T>::gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag)
213 {
214 ones.clear();
215 flag.clear();
216
217 verrou.lock();
218 try
219 {
220 std::map<unsigned int, unsigned int>::iterator it;
221 std::map<unsigned int, unsigned int>::iterator tmp;
222
223 do
224 {
225 it = corres.begin();
226
227 while(it != corres.end())
228 {
229 if(it->first > next_index) // not continuous sequence
230 break; // exiting the inner while loop
231
232 if(it->first == next_index)
233 {
234
235 // sanity checks
236
237 if(it->second >= table.size())
238 throw THREADAR_BUG;
239 if(table[it->second].index != next_index)
240 throw THREADAR_BUG;
241 if(table[it->second].empty)
242 throw THREADAR_BUG;
243 if( ! table[it->second].obj)
244 throw THREADAR_BUG;
245
246 // recording the change
247
248 ones.push_back(std::move(table[it->second].obj));
249 flag.push_back(table[it->second].flag);
250
251 table[it->second].empty = true;
252 empty_slot.push_back(it->second);
253 tmp = it;
254 ++it;
255 corres.erase(tmp);
256 ++next_index;
257 }
258 else // integer overload occured for the index
259 ++it; // skipping this entry
260 }
261
262 if(ones.empty())
263 verrou.wait(cond_pending_data);
264 }
265 while(ones.empty());
266
267 if(verrou.get_waiting_thread_count(cond_full) > 0)
268 verrou.broadcast(cond_full); // awake all pending workers
269 }
270 catch(...)
271 {
272 verrou.unlock(); // unlock first, as broadcast() may be the cause of the exception
273 verrou.broadcast(cond_pending_data);
274 verrou.broadcast(cond_full);
275 throw;
276 }
277 verrou.unlock();
278
279 if(ones.size() != flag.size())
280 throw THREADAR_BUG;
281 }
282
283 template <class T> void ratelier_gather<T>::reset()
284 {
285 unsigned int size = table.size();
286 next_index = 0;
287 corres.clear();
288 empty_slot.clear();
289
290 for(unsigned int i = 0; i < size; ++i)
291 {
292 table[i].obj.reset();
293 table[i].empty = true;
294 empty_slot.push_back(i);
295 }
296
297 verrou.lock();
298 verrou.broadcast(cond_pending_data);
299 verrou.broadcast(cond_full);
300 verrou.unlock();
301 }
302
303
304} // end of namespace
305
306#endif
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Definition condition.hpp:46
Exception used to report out or range value or argument.
the class ratelier_gather has a fixed length range of slots of arbitrary defined object type
void worker_push_one(unsigned int slot, std::unique_ptr< T > &one, signed int flag=0)
provides to a worker thread a mean to given data with its associated index to a gathering thread
void gather(std::deque< std::unique_ptr< T > > &ones, std::deque< signed int > &flag)
obtain the lowest continuous filled slots of the ratelier_gather and free them
void reset()
reset the object in its prestine state
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition barrier.hpp:46