aGrUM  0.14.2
recordCounter_tpl.h
Go to the documentation of this file.
1 /***************************************************************************
2  * Copyright (C) 2005 by Christophe GONZALES and Pierre-Henri WUILLEMIN *
3  * {prenom.nom}_at_lip6.fr *
4  * *
5  * This program is free software; you can redistribute it and/or modify *
6  * it under the terms of the GNU General Public License as published by *
7  * the Free Software Foundation; either version 2 of the License, or *
8  * (at your option) any later version. *
9  * *
10  * This program is distributed in the hope that it will be useful, *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13  * GNU General Public License for more details. *
14  * *
15  * You should have received a copy of the GNU General Public License *
16  * along with this program; if not, write to the *
17  * Free Software Foundation, Inc., *
18  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19  ***************************************************************************/
27 
28 
29 #ifndef DOXYGEN_SHOULD_SKIP_THIS
30 
31 namespace gum {
32 
33  namespace learning {
34 
35 
37  template < template < typename > class ALLOC >
40  return __parsers.get_allocator();
41  }
42 
43 
45  template < template < typename > class ALLOC >
47  const DBRowGeneratorParser< ALLOC >& parser,
48  const std::vector< std::pair< std::size_t, std::size_t >,
49  ALLOC< std::pair< std::size_t, std::size_t > > >& ranges,
50  const Bijection< NodeId, std::size_t, ALLOC< std::size_t > >&
51  nodeId2columns,
52  const typename RecordCounter< ALLOC >::allocator_type& alloc) :
53  __parsers(alloc),
54  __ranges(alloc), __nodeId2columns(nodeId2columns),
55  __last_DB_countings(alloc), __last_DB_ids(alloc),
56  __last_nonDB_countings(alloc), __last_nonDB_ids(alloc) {
57  // check that the columns in nodeId2columns do belong to the database
58  const std::size_t db_nb_cols = parser.database().nbVariables();
59  for (auto iter = nodeId2columns.cbegin(); iter != nodeId2columns.cend();
60  ++iter) {
61  if (iter.second() >= db_nb_cols) {
62  GUM_ERROR(OutOfBounds,
63  "the mapping between ids and database columns "
64  << "is incorrect because Column " << iter.second()
65  << " does not belong to the database.");
66  }
67  }
68 
69  // create the parsers. There should always be at least one parser
70  if (__max_nb_threads < std::size_t(1)) __max_nb_threads = std::size_t(1);
71  __parsers.reserve(__max_nb_threads);
72  for (std::size_t i = std::size_t(0); i < __max_nb_threads; ++i)
73  __parsers.push_back(parser);
74 
75  // check that the ranges are within the bounds of the database and
76  // save them
77  __checkRanges(ranges);
78  __ranges.reserve(ranges.size());
79  for (const auto& range : ranges)
80  __ranges.push_back(range);
81 
82  // dispatch the ranges for the threads
83  __dispatchRangesToThreads();
84 
85  GUM_CONSTRUCTOR(RecordCounter);
86  }
87 
88 
90  template < template < typename > class ALLOC >
92  const DBRowGeneratorParser< ALLOC >& parser,
93  const Bijection< NodeId, std::size_t, ALLOC< std::size_t > >&
94  nodeId2columns,
95  const typename RecordCounter< ALLOC >::allocator_type& alloc) :
96  RecordCounter< ALLOC >(
97  parser,
98  std::vector< std::pair< std::size_t, std::size_t >,
99  ALLOC< std::pair< std::size_t, std::size_t > > >(),
100  nodeId2columns,
101  alloc) {}
102 
103 
105  template < template < typename > class ALLOC >
107  const RecordCounter< ALLOC >& from,
108  const typename RecordCounter< ALLOC >::allocator_type& alloc) :
109  __parsers(from.__parsers, alloc),
110  __ranges(from.__ranges, alloc),
111  __thread_ranges(from.__thread_ranges, alloc),
112  __nodeId2columns(from.__nodeId2columns),
113  __last_DB_countings(from.__last_DB_countings, alloc),
114  __last_DB_ids(from.__last_DB_ids),
115  __last_nonDB_countings(from.__last_nonDB_countings, alloc),
116  __last_nonDB_ids(from.__last_nonDB_ids),
117  __max_nb_threads(from.__max_nb_threads),
118  __min_nb_rows_per_thread(from.__min_nb_rows_per_thread) {
119  GUM_CONS_CPY(RecordCounter);
120  }
121 
122 
124  template < template < typename > class ALLOC >
125  RecordCounter< ALLOC >::RecordCounter(const RecordCounter< ALLOC >& from) :
126  RecordCounter< ALLOC >(from, from.getAllocator()) {}
127 
128 
130  template < template < typename > class ALLOC >
132  RecordCounter< ALLOC >&& from,
133  const typename RecordCounter< ALLOC >::allocator_type& alloc) :
134  __parsers(std::move(from.__parsers), alloc),
135  __ranges(std::move(from.__ranges), alloc),
136  __thread_ranges(std::move(from.__thread_ranges), alloc),
137  __nodeId2columns(std::move(from.__nodeId2columns)),
138  __last_DB_countings(std::move(from.__last_DB_countings), alloc),
139  __last_DB_ids(std::move(from.__last_DB_ids)),
140  __last_nonDB_countings(std::move(from.__last_nonDB_countings), alloc),
141  __last_nonDB_ids(std::move(from.__last_nonDB_ids)),
142  __max_nb_threads(from.__max_nb_threads),
143  __min_nb_rows_per_thread(from.__min_nb_rows_per_thread) {
144  GUM_CONS_MOV(RecordCounter);
145  }
146 
147 
149  template < template < typename > class ALLOC >
150  RecordCounter< ALLOC >::RecordCounter(RecordCounter< ALLOC >&& from) :
151  RecordCounter< ALLOC >(std::move(from), from.getAllocator()) {}
152 
153 
155  template < template < typename > class ALLOC >
156  RecordCounter< ALLOC >* RecordCounter< ALLOC >::clone(
157  const typename RecordCounter< ALLOC >::allocator_type& alloc) const {
158  ALLOC< RecordCounter< ALLOC > > allocator(alloc);
159  RecordCounter< ALLOC >* new_counter = allocator.allocate(1);
160  try {
161  allocator.construct(new_counter, *this, alloc);
162  } catch (...) {
163  allocator.deallocate(new_counter, 1);
164  throw;
165  }
166 
167  return new_counter;
168  }
169 
170 
172  template < template < typename > class ALLOC >
173  RecordCounter< ALLOC >* RecordCounter< ALLOC >::clone() const {
174  return clone(this->getAllocator());
175  }
176 
177 
179  template < template < typename > class ALLOC >
181  GUM_DESTRUCTOR(RecordCounter);
182  }
183 
184 
186  template < template < typename > class ALLOC >
187  RecordCounter< ALLOC >& RecordCounter< ALLOC >::
188  operator=(const RecordCounter< ALLOC >& from) {
189  if (this != &from) {
190  __parsers = from.__parsers;
191  __ranges = from.__ranges;
192  __thread_ranges = from.__thread_ranges;
193  __nodeId2columns = from.__nodeId2columns;
194  __last_DB_countings = from.__last_DB_countings;
195  __last_DB_ids = from.__last_DB_ids;
196  __last_nonDB_countings = from.__last_nonDB_countings;
197  __last_nonDB_ids = from.__last_nonDB_ids;
198  __max_nb_threads = from.__max_nb_threads;
199  __min_nb_rows_per_thread = from.__min_nb_rows_per_thread;
200  }
201  return *this;
202  }
203 
204 
206  template < template < typename > class ALLOC >
207  RecordCounter< ALLOC >& RecordCounter< ALLOC >::
208  operator=(RecordCounter< ALLOC >&& from) {
209  if (this != &from) {
210  __parsers = std::move(from.__parsers);
211  __ranges = std::move(from.__ranges);
212  __thread_ranges = std::move(from.__thread_ranges);
213  __nodeId2columns = std::move(from.__nodeId2columns);
214  __last_DB_countings = std::move(from.__last_DB_countings);
215  __last_DB_ids = std::move(from.__last_DB_ids);
216  __last_nonDB_countings = std::move(from.__last_nonDB_countings);
217  __last_nonDB_ids = std::move(from.__last_nonDB_ids);
218  __max_nb_threads = from.__max_nb_threads;
219  __min_nb_rows_per_thread = from.__min_nb_rows_per_thread;
220  }
221  return *this;
222  }
223 
224 
226  template < template < typename > class ALLOC >
228  __last_DB_countings.clear();
229  __last_DB_ids.clear();
230  __last_nonDB_countings.clear();
231  __last_nonDB_ids.clear();
232  }
233 
234 
236  template < template < typename > class ALLOC >
237  void RecordCounter< ALLOC >::setMaxNbThreads(const std::size_t nb) const {
238  if (nb == std::size_t(0) || !isOMP())
239  __max_nb_threads = std::size_t(1);
240  else
241  __max_nb_threads = nb;
242  }
243 
244 
246  template < template < typename > class ALLOC >
247  INLINE std::size_t RecordCounter< ALLOC >::nbThreads() const {
248  return __max_nb_threads;
249  }
250 
251 
252  // changes the number min of rows a thread should process in a
253  // multithreading context
254  template < template < typename > class ALLOC >
255  void
256  RecordCounter< ALLOC >::setMinNbRowsPerThread(const std::size_t nb) const {
257  if (nb == std::size_t(0))
258  __min_nb_rows_per_thread = std::size_t(1);
259  else
260  __min_nb_rows_per_thread = nb;
261  }
262 
263 
265  template < template < typename > class ALLOC >
266  INLINE std::size_t RecordCounter< ALLOC >::minNbRowsPerThread() const {
267  return __min_nb_rows_per_thread;
268  }
269 
270 
272  template < template < typename > class ALLOC >
273  void RecordCounter< ALLOC >::__raiseCheckException(
274  const std::vector< std::string, ALLOC< std::string > >& bad_vars) const {
275  // generate the exception
276  std::stringstream msg;
277  msg << "Counts cannot be performed on continuous variables. ";
278  msg << "Unfortunately the following variable";
279  if (bad_vars.size() == 1)
280  msg << " is continuous: " << bad_vars[0];
281  else {
282  msg << "s are continuous: ";
283  bool deja = false;
284  for (const auto& name : bad_vars) {
285  if (deja)
286  msg << ", ";
287  else
288  deja = true;
289  msg << name;
290  }
291  }
292  GUM_ERROR(TypeError, msg.str());
293  }
294 
295 
297  template < template < typename > class ALLOC >
298  void RecordCounter< ALLOC >::__checkDiscreteVariables(
299  const IdSet< ALLOC >& ids) const {
300  const std::size_t size = ids.size();
301  const DatabaseTable< ALLOC >& database = __parsers[0].data.database();
302 
303  if (__nodeId2columns.empty()) {
304  // check all the ids
305  for (std::size_t i = std::size_t(0); i < size; ++i) {
306  if (database.variable(i).varType() == VarType::Continuous) {
307  // here, var i does not correspond to a discrete variable.
308  // we check whether there are other non discrete variables, so that
309  // we can generate an exception mentioning all these variables
310  std::vector< std::string, ALLOC< std::string > > bad_vars{
311  database.variable(i).name()};
312  for (++i; i < size; ++i) {
313  if (database.variable(i).varType() == VarType::Continuous)
314  bad_vars.push_back(database.variable(i).name());
315  }
316  __raiseCheckException(bad_vars);
317  }
318  }
319  } else {
320  // check all the ids
321  for (std::size_t i = std::size_t(0); i < size; ++i) {
322  // get the position of the variable in the database
323  std::size_t pos = __nodeId2columns.second(ids[i]);
324 
325  if (database.variable(pos).varType() == VarType::Continuous) {
326  // here, id does not correspond to a discrete variable.
327  // we check whether there are other non discrete variables, so that
328  // we can generate an exception mentioning all these variables
329  std::vector< std::string, ALLOC< std::string > > bad_vars{
330  database.variable(pos).name()};
331  for (++i; i < size; ++i) {
332  pos = __nodeId2columns.second(ids[i]);
333  if (database.variable(pos).varType() == VarType::Continuous)
334  bad_vars.push_back(database.variable(pos).name());
335  }
336  __raiseCheckException(bad_vars);
337  }
338  }
339  }
340  }
341 
342 
344  template < template < typename > class ALLOC >
345  INLINE const Bijection< NodeId, std::size_t, ALLOC< std::size_t > >&
347  return __nodeId2columns;
348  }
349 
350 
352  template < template < typename > class ALLOC >
353  const DatabaseTable< ALLOC >& RecordCounter< ALLOC >::database() const {
354  return __parsers[0].data.database();
355  }
356 
357 
359  template < template < typename > class ALLOC >
360  INLINE const std::vector< double, ALLOC< double > >&
361  RecordCounter< ALLOC >::counts(const IdSet< ALLOC >& ids,
362  const bool check_discrete_vars) {
363  // if the idset is empty, return an empty vector
364  if (ids.empty()) {
365  __last_nonDB_ids.clear();
366  __last_nonDB_countings.clear();
367  return __last_nonDB_countings;
368  }
369 
370  // check whether we can extract the vector we wish to return from
371  // some already computed counting vector
372  if (__last_nonDB_ids.contains(ids))
373  return __extractFromCountings(
374  ids, __last_nonDB_ids, __last_nonDB_countings);
375  else if (__last_DB_ids.contains(ids))
376  return __extractFromCountings(ids, __last_DB_ids, __last_DB_countings);
377  else {
378  if (check_discrete_vars) __checkDiscreteVariables(ids);
379  return __countFromDatabase(ids);
380  }
381  }
382 
383 
384  // returns a mapping from the nodes ids to the columns of the database
385  // for a given sequence of ids
386  template < template < typename > class ALLOC >
387  HashTable< NodeId, std::size_t > RecordCounter< ALLOC >::__getNodeIds2Columns(
388  const IdSet< ALLOC >& ids) const {
389  HashTable< NodeId, std::size_t > res(ids.size());
390  if (__nodeId2columns.empty()) {
391  for (const auto id : ids) {
392  res.insert(id, std::size_t(id));
393  }
394  } else {
395  for (const auto id : ids) {
396  res.insert(id, __nodeId2columns.second(id));
397  }
398  }
399  return res;
400  }
401 
402 
404  template < template < typename > class ALLOC >
405  INLINE std::vector< double, ALLOC< double > >&
406  RecordCounter< ALLOC >::__extractFromCountings(
407  const IdSet< ALLOC >& subset_ids,
408  const IdSet< ALLOC >& superset_ids,
409  const std::vector< double, ALLOC< double > >& superset_vect) {
410  // get a mapping between the node Ids and their columns in the database.
411  // This should be stored into __nodeId2columns, except if the latter is
412  // empty, in which case there is an identity mapping
413  const auto nodeId2columns = __getNodeIds2Columns(superset_ids);
414 
415  // we first determine the size of the output vector, the domain of
416  // each of its variables and their offsets in the output vector
417  const auto& database = __parsers[0].data.database();
418  std::size_t result_vect_size = std::size_t(1);
419  for (const auto id : subset_ids) {
420  result_vect_size *= database.domainSize(nodeId2columns[id]);
421  }
422 
423  // we create the output vector
424  const std::size_t subset_ids_size = std::size_t(subset_ids.size());
425  std::vector< double, ALLOC< double > > result_vect(result_vect_size, 0.0);
426 
427 
428  // check if the subset_ids is the beginning of the sequence of superset_ids
429  // if this is the case, then we can outer loop over the variables not in
430  // subset_ids and, for each iteration of this loop add a vector of size
431  // result_size to result_vect
432  bool subset_begin = true;
433  for (std::size_t i = 0; i < subset_ids_size; ++i) {
434  if (superset_ids.pos(subset_ids[i]) != i) {
435  subset_begin = false;
436  break;
437  }
438  }
439 
440  if (subset_begin) {
441  const std::size_t superset_vect_size = superset_vect.size();
442  std::size_t i = std::size_t(0);
443  while (i < superset_vect_size) {
444  for (std::size_t j = std::size_t(0); j < result_vect_size; ++j, ++i) {
445  result_vect[j] += superset_vect[i];
446  }
447  }
448 
449  // save the subset_ids and the result vector
450  try {
451  __last_nonDB_ids = subset_ids;
452  __last_nonDB_countings = std::move(result_vect);
453  return __last_nonDB_countings;
454  } catch (...) {
455  __last_nonDB_ids.clear();
456  __last_nonDB_countings.clear();
457  throw;
458  }
459  }
460 
461 
462  // check if subset_ids is the end of the sequence of superset_ids.
463  // In this case, as above, there are two simple loops to perform the
464  // countings
465  bool subset_end = true;
466  const std::size_t superset_ids_size = std::size_t(superset_ids.size());
467  for (std::size_t i = 0; i < subset_ids_size; ++i) {
468  if (superset_ids.pos(subset_ids[i])
469  != i + superset_ids_size - subset_ids_size) {
470  subset_end = false;
471  break;
472  }
473  }
474 
475  if (subset_end) {
476  // determine the size of the vector corresponding to the variables
477  // not belonging to subset_ids
478  std::size_t vect_not_subset_size = std::size_t(1);
479  for (std::size_t i = std::size_t(0);
480  i < superset_ids_size - subset_ids_size;
481  ++i)
482  vect_not_subset_size *=
483  database.domainSize(nodeId2columns[superset_ids[i]]);
484 
485  // perform the two loops
486  std::size_t i = std::size_t(0);
487  for (std::size_t j = std::size_t(0); j < result_vect_size; ++j) {
488  for (std::size_t k = std::size_t(0); k < vect_not_subset_size;
489  ++k, ++i) {
490  result_vect[j] += superset_vect[i];
491  }
492  }
493 
494  // save the subset_ids and the result vector
495  try {
496  __last_nonDB_ids = subset_ids;
497  __last_nonDB_countings = std::move(result_vect);
498  return __last_nonDB_countings;
499  } catch (...) {
500  __last_nonDB_ids.clear();
501  __last_nonDB_countings.clear();
502  throw;
503  }
504  }
505 
506 
507  // here subset_ids is a subset of superset_ids neither prefixing nor
508  // postfixing it. So the computation is somewhat more complicated.
509 
510  // We will parse the superset_vect sequentially (using ++ operator).
511  // Sometimes, we will need to change the offset of the cell of result_vect
512  // that will be affected, sometimes not. Vector before_incr will indicate
513  // whether we need to change the offset (value = 0) or not (value different
514  // from 0). Vectors result_domain will indicate how this offset should be
515  // computed. Here is an example of the values of these vectors. Assume that
516  // superset_ids = <A,B,C,D,E> and subset_ids = <A,D,C>. Then, the three
517  // vectors before_incr, result_domain and result_offset are indexed w.r.t.
518  // A,C,D, i.e., w.r.t. to the variables in subset_ids but order w.r.t.
519  // superset_ids (this is convenient as we will parse superset_vect
520  // sequentially. For a variable or a set of variables X, let M_X denote the
521  // domain size of X. Then the contents of the three vectors are as follows:
522  // before_incr = {0, M_B, 0} (this means that whenever we iterate over B's
523  // values, the offset in result_vect does not change)
524  // result_domain = { M_A, M_C, M_D } (i.e., the domain sizes of the variables
525  // in subset_ids, order w.r.t. superset_ids)
526  // result_offset = { 1, M_A*M_D, M_A } (this corresponds to the offsets
527  // in result_vect of variables A, C and D)
528  // Vector superset_order = { 0, 2, 1} : this is a map from the indices of
529  // the variables in subset_ids to the indices of these variables in the
530  // three vectors described above. For instance, the "2" means that variable
531  // D (which is at index 1 in subset_ids) is located at index 2 in vector
532  // before_incr
533  std::vector< std::size_t > before_incr(subset_ids_size);
534  std::vector< std::size_t > result_domain(subset_ids_size);
535  std::vector< std::size_t > result_offset(subset_ids_size);
536  {
537  std::size_t result_domain_size = std::size_t(1);
538  std::size_t tmp_before_incr = std::size_t(1);
539  std::vector< std::size_t > superset_order(subset_ids_size);
540 
541  for (std::size_t h = std::size_t(0), j = std::size_t(0);
542  j < subset_ids_size;
543  ++h) {
544  if (subset_ids.exists(superset_ids[h])) {
545  before_incr[j] = tmp_before_incr - 1;
546  superset_order[subset_ids.pos(superset_ids[h])] = j;
547  tmp_before_incr = 1;
548  ++j;
549  } else {
550  tmp_before_incr *=
551  database.domainSize(nodeId2columns[superset_ids[h]]);
552  }
553  }
554 
555  // compute the offsets in the order of the superset_ids
556  for (std::size_t i = 0; i < subset_ids.size(); ++i) {
557  const std::size_t domain_size =
558  database.domainSize(nodeId2columns[subset_ids[i]]);
559  const std::size_t j = superset_order[i];
560  result_domain[j] = domain_size;
561  result_offset[j] = result_domain_size;
562  result_domain_size *= domain_size;
563  }
564  }
565 
566  std::vector< std::size_t > result_value(result_domain);
567  std::vector< std::size_t > current_incr(before_incr);
568  std::vector< std::size_t > result_down(result_offset);
569 
570  for (std::size_t j = std::size_t(0); j < result_down.size(); ++j) {
571  result_down[j] *= (result_domain[j] - 1);
572  }
573 
574  // now we can loop over the superset_vect to fill result_vect
575  const std::size_t superset_vect_size = superset_vect.size();
576  std::size_t the_result_offset = std::size_t(0);
577  for (std::size_t h = std::size_t(0); h < superset_vect_size; ++h) {
578  result_vect[the_result_offset] += superset_vect[h];
579 
580  // update the offset of result_vect
581  for (std::size_t k = 0; k < current_incr.size(); ++k) {
582  // check if we need modify result_offset
583  if (current_incr[k]) {
584  --current_incr[k];
585  break;
586  }
587 
588  current_incr[k] = before_incr[k];
589 
590  // here we shall modify result_offset
591  --result_value[k];
592 
593  if (result_value[k]) {
594  the_result_offset += result_offset[k];
595  break;
596  }
597 
598  result_value[k] = result_domain[k];
599  the_result_offset -= result_down[k];
600  }
601  }
602 
603  // save the subset_ids and the result vector
604  try {
605  __last_nonDB_ids = subset_ids;
606  __last_nonDB_countings = std::move(result_vect);
607  return __last_nonDB_countings;
608  } catch (...) {
609  __last_nonDB_ids.clear();
610  __last_nonDB_countings.clear();
611  throw;
612  }
613  }
614 
615 
617  template < template < typename > class ALLOC >
618  std::vector< double, ALLOC< double > >&
619  RecordCounter< ALLOC >::__countFromDatabase(const IdSet< ALLOC >& ids) {
620  // if the ids vector is empty or the database is empty, return an
621  // empty vector
622  const auto& database = __parsers[0].data.database();
623  if (ids.empty() || database.empty() || __thread_ranges.empty()) {
624  __last_nonDB_countings.clear();
625  __last_nonDB_ids.clear();
626  return __last_nonDB_countings;
627  }
628 
629  // we translate the ids into their corresponding columns in the
630  // DatabaseTable
631  const auto nodeId2columns = __getNodeIds2Columns(ids);
632 
633  // we first determine the size of the counting vector, the domain of
634  // each of its variables and their offsets in the output vector
635  const std::size_t ids_size = ids.size();
636  std::size_t counting_vect_size = std::size_t(1);
637  std::vector< std::size_t, ALLOC< std::size_t > > domain_sizes(ids_size);
638  std::vector< std::pair< std::size_t, std::size_t >,
639  ALLOC< std::pair< std::size_t, std::size_t > > >
640  cols_offsets(ids_size);
641  {
642  std::size_t i = std::size_t(0);
643  for (const auto id : ids) {
644  const std::size_t domain_size = database.domainSize(nodeId2columns[id]);
645  domain_sizes[i] = domain_size;
646  cols_offsets[i].first = nodeId2columns[id];
647  cols_offsets[i].second = counting_vect_size;
648  counting_vect_size *= domain_size;
649  ++i;
650  }
651  }
652 
653  // we sort the columns and offsets by increasing column index. This
654  // may speed up threaded countings by improving the cacheline hits
655  std::sort(cols_offsets.begin(),
656  cols_offsets.end(),
657  [](const std::pair< std::size_t, std::size_t >& a,
658  const std::pair< std::size_t, std::size_t >& b) -> bool {
659  return a.first < b.first;
660  });
661 
662  // create parsers if needed
663  const std::size_t nb_ranges = __thread_ranges.size();
664  const std::size_t nb_threads =
665  nb_ranges <= __max_nb_threads ? nb_ranges : __max_nb_threads;
666  while (__parsers.size() < nb_threads) {
667  ThreadData< DBRowGeneratorParser< ALLOC > > new_parser(__parsers[0]);
668  __parsers.push_back(std::move(new_parser));
669  }
670 
671  // set the columns of interest for each parser. This specifies to the
672  // parser which columns are used for the countings. This is important
673  // for parsers like the EM parser that complete unobserved variables.
674  std::vector< std::size_t, ALLOC< std::size_t > > cols_of_interest(ids_size);
675  for (std::size_t i = std::size_t(0); i < ids_size; ++i) {
676  cols_of_interest[i] = cols_offsets[i].first;
677  }
678  for (auto& parser : __parsers) {
679  parser.data.setColumnsOfInterest(cols_of_interest);
680  }
681 
682  // allocate all the counting vectors, including that which will add
683  // all the results provided by the threads. We initialize once and
684  // for all these vectors with zeroes
685  std::vector< double, ALLOC< double > > counting_vect(counting_vect_size,
686  0.0);
687  std::vector< ThreadData< std::vector< double, ALLOC< double > > >,
688  ALLOC< ThreadData< std::vector< double, ALLOC< double > > > > >
689  thread_countings(
690  nb_threads,
691  ThreadData< std::vector< double, ALLOC< double > > >(counting_vect));
692 
693  // launch the threads
694  // here we use openMP for launching the threads because, experimentally,
695  // it seems to provide results that are twice as fast as the results
696  // with the std::thread
697  for (std::size_t i = std::size_t(0); i < nb_ranges; i += nb_threads) {
698 # pragma omp parallel num_threads(int(nb_threads))
699  {
700  // get the number of the thread
701  const std::size_t this_thread = getThreadNumber();
702  if (this_thread + i < nb_ranges) {
703  DBRowGeneratorParser< ALLOC >& parser = __parsers[this_thread].data;
704  parser.setRange(__thread_ranges[this_thread + i].first,
705  __thread_ranges[this_thread + i].second);
706  std::vector< double, ALLOC< double > >& countings =
707  thread_countings[this_thread].data;
708 
709  // parse the database
710  try {
711  while (parser.hasRows()) {
712  // get the observed rows
713  const DBRow< DBTranslatedValue >& row = parser.row();
714 
715  // fill the counts for the current row
716  std::size_t offset = std::size_t(0);
717  for (std::size_t i = std::size_t(0); i < ids_size; ++i) {
718  offset +=
719  row[cols_offsets[i].first].discr_val * cols_offsets[i].second;
720  }
721 
722  countings[offset] += row.weight();
723  }
724  } catch (NotFound&) {} // this exception is raised by the row filter
725  // if the row generators create no output row
726  // from the last rows of the database
727  }
728  }
729  }
730 
731 
732  // add the counts to counting_vect
733  for (std::size_t k = std::size_t(0); k < nb_threads; ++k) {
734  const auto& thread_counting = thread_countings[k].data;
735  for (std::size_t r = std::size_t(0); r < counting_vect_size; ++r) {
736  counting_vect[r] += thread_counting[r];
737  }
738  }
739 
740  // save the final results
741  __last_DB_ids = ids;
742  __last_DB_countings = std::move(counting_vect);
743 
744  return __last_DB_countings;
745  }
746 
747 
749  template < template < typename > class ALLOC >
750  void RecordCounter< ALLOC >::__threadedCount(
751  const std::size_t begin,
752  const std::size_t end,
753  DBRowGeneratorParser< ALLOC >& parser,
754  const std::vector< std::pair< std::size_t, std::size_t >,
755  ALLOC< std::pair< std::size_t, std::size_t > > >&
756  cols_offsets,
757  std::vector< double, ALLOC< double > >& countings) {
758  parser.setRange(begin, end);
759 
760  try {
761  const std::size_t nb_columns = cols_offsets.size();
762  while (parser.hasRows()) {
763  // get the observed filtered rows
764  const DBRow< DBTranslatedValue >& row = parser.row();
765 
766  // fill the counts for the current row
767  std::size_t offset = std::size_t(0);
768  for (std::size_t i = std::size_t(0); i < nb_columns; ++i) {
769  offset +=
770  row[cols_offsets[i].first].discr_val * cols_offsets[i].second;
771  }
772 
773  countings[offset] += row.weight();
774  }
775  } catch (NotFound&) {} // this exception is raised by the row filter if the
776  // row generators create no output row from the last
777  // rows of the database
778  }
779 
780 
782  template < template < typename > class ALLOC >
783  template < template < typename > class XALLOC >
784  void RecordCounter< ALLOC >::__checkRanges(
785  const std::vector< std::pair< std::size_t, std::size_t >,
786  XALLOC< std::pair< std::size_t, std::size_t > > >&
787  new_ranges) const {
788  const std::size_t dbsize = __parsers[0].data.database().nbRows();
789  std::vector< std::pair< std::size_t, std::size_t >,
790  ALLOC< std::pair< std::size_t, std::size_t > > >
791  incorrect_ranges;
792  for (const auto& range : new_ranges) {
793  if ((range.first >= range.second) || (range.second > dbsize)) {
794  incorrect_ranges.push_back(range);
795  }
796  }
797  if (!incorrect_ranges.empty()) {
798  std::stringstream str;
799  str << "It is impossible to set the ranges because the following one";
800  if (incorrect_ranges.size() > 1)
801  str << "s are incorrect: ";
802  else
803  str << " is incorrect: ";
804  bool deja = false;
805  for (const auto& range : incorrect_ranges) {
806  if (deja)
807  str << ", ";
808  else
809  deja = true;
810  str << '[' << range.first << ';' << range.second << ')';
811  }
812 
813  GUM_ERROR(OutOfBounds, str.str());
814  }
815  }
816 
817 
819  template < template < typename > class ALLOC >
820  void RecordCounter< ALLOC >::__dispatchRangesToThreads() {
821  __thread_ranges.clear();
822 
823  // ensure that __ranges contains the ranges asked by the user
824  bool add_range = false;
825  if (__ranges.empty()) {
826  const auto& database = __parsers[0].data.database();
827  __ranges.push_back(std::pair< std::size_t, std::size_t >(
828  std::size_t(0), database.nbRows()));
829  add_range = true;
830  }
831 
832  // dispatch the ranges
833  for (const auto& range : __ranges) {
834  if (range.second > range.first) {
835  const std::size_t range_size = range.second - range.first;
836  std::size_t nb_threads = range_size / __min_nb_rows_per_thread;
837  if (nb_threads < 1)
838  nb_threads = 1;
839  else if (nb_threads > __max_nb_threads)
840  nb_threads = __max_nb_threads;
841  std::size_t nb_rows_par_thread = range_size / nb_threads;
842  std::size_t rest_rows = range_size - nb_rows_par_thread * nb_threads;
843 
844  std::size_t begin_index = range.first;
845  for (std::size_t i = std::size_t(0); i < nb_threads; ++i) {
846  std::size_t end_index = begin_index + nb_rows_par_thread;
847  if (rest_rows != std::size_t(0)) {
848  ++end_index;
849  --rest_rows;
850  }
851  __thread_ranges.push_back(
852  std::pair< std::size_t, std::size_t >(begin_index, end_index));
853  begin_index = end_index;
854  }
855  }
856  }
857  if (add_range) __ranges.clear();
858 
859  // sort ranges by decreasing range size, so that if the number of
860  // ranges exceeds the number of threads allowed, we start a first round of
861  // threads with the highest range, then another round with lower ranges,
862  // and so on until all the ranges have been processed
863  std::sort(__thread_ranges.begin(),
864  __thread_ranges.end(),
865  [](const std::pair< std::size_t, std::size_t >& a,
866  const std::pair< std::size_t, std::size_t >& b) -> bool {
867  return (a.second - a.first) > (b.second - b.first);
868  });
869  }
870 
871 
873  template < template < typename > class ALLOC >
874  template < template < typename > class XALLOC >
876  const std::vector< std::pair< std::size_t, std::size_t >,
877  XALLOC< std::pair< std::size_t, std::size_t > > >&
878  new_ranges) {
879  // first, we check that all ranges are within the database's bounds
880  __checkRanges(new_ranges);
881 
882  // since the ranges are OK, save them and clear the counting caches
883  const std::size_t new_size = new_ranges.size();
884  std::vector< std::pair< std::size_t, std::size_t >,
885  ALLOC< std::pair< std::size_t, std::size_t > > >
886  ranges(new_size);
887  for (std::size_t i = std::size_t(0); i < new_size; ++i) {
888  ranges[i].first = new_ranges[i].first;
889  ranges[i].second = new_ranges[i].second;
890  }
891 
892  clear();
893  __ranges = std::move(ranges);
894 
895  // dispatch the ranges to the threads
896  __dispatchRangesToThreads();
897  }
898 
899 
901  template < template < typename > class ALLOC >
903  if (__ranges.empty()) return;
904  clear();
905  __ranges.clear();
906  __dispatchRangesToThreads();
907  }
908 
909 
911  template < template < typename > class ALLOC >
912  INLINE const std::vector< std::pair< std::size_t, std::size_t >,
913  ALLOC< std::pair< std::size_t, std::size_t > > >&
915  return __ranges;
916  }
917 
918 
920  template < template < typename > class ALLOC >
921  template < typename GUM_SCALAR >
922  INLINE void
923  RecordCounter< ALLOC >::setBayesNet(const BayesNet< GUM_SCALAR >& new_bn) {
924  // remove the caches
925  clear();
926 
927  // assign the new BN
928  for (auto& xparser : __parsers) {
929  xparser.data.setBayesNet(new_bn);
930  }
931  }
932 
933  } /* namespace learning */
934 
935 } /* namespace gum */
936 
937 #endif /* DOXYGEN_SHOULD_SKIP_THIS */
bool isOMP()
Is OMP active ?
virtual RecordCounter< ALLOC > * clone() const
virtual copy constructor
unsigned int getThreadNumber()
Get the calling thread id.
const Bijection< NodeId, std::size_t, ALLOC< std::size_t > > & nodeId2Columns() const
returns the mapping from ids to column positions in the database
STL namespace.
void clearRanges()
reset the ranges to the one range corresponding to the whole database
std::size_t nbThreads() const
returns the number of threads used to parse the database
allocator_type getAllocator() const
returns the allocator used
gum is the global namespace for all aGrUM entities
Definition: agrum.h:25
void setMaxNbThreads(const std::size_t nb) const
changes the max number of threads used to parse the database
const std::vector< std::pair< std::size_t, std::size_t >, ALLOC< std::pair< std::size_t, std::size_t > > > & ranges() const
returns the current ranges
void clear()
clears all the last database-parsed countings from memory
std::size_t minNbRowsPerThread() const
returns the minimum of rows that each thread should process
The class that computes countings of observations from the database.
const std::vector< double, ALLOC< double > > & counts(const IdSet< ALLOC > &ids, const bool check_discrete_vars=false)
returns the counts over all the variables in an IdSet
ALLOC< NodeId > allocator_type
type for the allocators passed in arguments of methods
void setMinNbRowsPerThread(const std::size_t nb) const
changes the number min of rows a thread should process in a multithreading context ...
virtual ~RecordCounter()
destructor
void setRanges(const std::vector< std::pair< std::size_t, std::size_t >, XALLOC< std::pair< std::size_t, std::size_t > > > &new_ranges)
sets new ranges to perform the countings
RecordCounter< ALLOC > & operator=(const RecordCounter< ALLOC > &from)
copy operator
RecordCounter(const DBRowGeneratorParser< ALLOC > &parser, const std::vector< std::pair< std::size_t, std::size_t >, ALLOC< std::pair< std::size_t, std::size_t > > > &ranges, const Bijection< NodeId, std::size_t, ALLOC< std::size_t > > &nodeId2columns=Bijection< NodeId, std::size_t, ALLOC< std::size_t > >(), const allocator_type &alloc=allocator_type())
default constructor
Size NodeId
Type for node ids.
Definition: graphElements.h:97
#define GUM_ERROR(type, msg)
Definition: exceptions.h:52
void setBayesNet(const BayesNet< GUM_SCALAR > &new_bn)
assign a new Bayes net to all the counter&#39;s generators depending on a BN
const DatabaseTable< ALLOC > & database() const
returns the database on which we perform the counts