aGrUM  0.16.0
recordCounter_tpl.h
Go to the documentation of this file.
1 
30 
31 
32 #ifndef DOXYGEN_SHOULD_SKIP_THIS
33 
34 namespace gum {
35 
36  namespace learning {
37 
38 
40  template < template < typename > class ALLOC >
43  return __parsers.get_allocator();
44  }
45 
46 
48  template < template < typename > class ALLOC >
50  const DBRowGeneratorParser< ALLOC >& parser,
51  const std::vector< std::pair< std::size_t, std::size_t >,
52  ALLOC< std::pair< std::size_t, std::size_t > > >& ranges,
53  const Bijection< NodeId, std::size_t, ALLOC< std::size_t > >&
54  nodeId2columns,
55  const typename RecordCounter< ALLOC >::allocator_type& alloc) :
56  __parsers(alloc),
57  __ranges(alloc), __nodeId2columns(nodeId2columns),
58  __last_DB_countings(alloc), __last_DB_ids(alloc),
59  __last_nonDB_countings(alloc), __last_nonDB_ids(alloc) {
60  // check that the columns in nodeId2columns do belong to the database
61  const std::size_t db_nb_cols = parser.database().nbVariables();
62  for (auto iter = nodeId2columns.cbegin(); iter != nodeId2columns.cend();
63  ++iter) {
64  if (iter.second() >= db_nb_cols) {
65  GUM_ERROR(OutOfBounds,
66  "the mapping between ids and database columns "
67  << "is incorrect because Column " << iter.second()
68  << " does not belong to the database.");
69  }
70  }
71 
72  // create the parsers. There should always be at least one parser
73  if (__max_nb_threads < std::size_t(1)) __max_nb_threads = std::size_t(1);
74  __parsers.reserve(__max_nb_threads);
75  for (std::size_t i = std::size_t(0); i < __max_nb_threads; ++i)
76  __parsers.push_back(parser);
77 
78  // check that the ranges are within the bounds of the database and
79  // save them
80  __checkRanges(ranges);
81  __ranges.reserve(ranges.size());
82  for (const auto& range : ranges)
83  __ranges.push_back(range);
84 
85  // dispatch the ranges for the threads
86  __dispatchRangesToThreads();
87 
88  GUM_CONSTRUCTOR(RecordCounter);
89  }
90 
91 
93  template < template < typename > class ALLOC >
95  const DBRowGeneratorParser< ALLOC >& parser,
96  const Bijection< NodeId, std::size_t, ALLOC< std::size_t > >&
97  nodeId2columns,
98  const typename RecordCounter< ALLOC >::allocator_type& alloc) :
99  RecordCounter< ALLOC >(
100  parser,
101  std::vector< std::pair< std::size_t, std::size_t >,
102  ALLOC< std::pair< std::size_t, std::size_t > > >(),
103  nodeId2columns,
104  alloc) {}
105 
106 
108  template < template < typename > class ALLOC >
110  const RecordCounter< ALLOC >& from,
111  const typename RecordCounter< ALLOC >::allocator_type& alloc) :
112  __parsers(from.__parsers, alloc),
113  __ranges(from.__ranges, alloc),
114  __thread_ranges(from.__thread_ranges, alloc),
115  __nodeId2columns(from.__nodeId2columns),
116  __last_DB_countings(from.__last_DB_countings, alloc),
117  __last_DB_ids(from.__last_DB_ids),
118  __last_nonDB_countings(from.__last_nonDB_countings, alloc),
119  __last_nonDB_ids(from.__last_nonDB_ids),
120  __max_nb_threads(from.__max_nb_threads),
121  __min_nb_rows_per_thread(from.__min_nb_rows_per_thread) {
122  GUM_CONS_CPY(RecordCounter);
123  }
124 
125 
127  template < template < typename > class ALLOC >
128  RecordCounter< ALLOC >::RecordCounter(const RecordCounter< ALLOC >& from) :
129  RecordCounter< ALLOC >(from, from.getAllocator()) {}
130 
131 
133  template < template < typename > class ALLOC >
135  RecordCounter< ALLOC >&& from,
136  const typename RecordCounter< ALLOC >::allocator_type& alloc) :
137  __parsers(std::move(from.__parsers), alloc),
138  __ranges(std::move(from.__ranges), alloc),
139  __thread_ranges(std::move(from.__thread_ranges), alloc),
140  __nodeId2columns(std::move(from.__nodeId2columns)),
141  __last_DB_countings(std::move(from.__last_DB_countings), alloc),
142  __last_DB_ids(std::move(from.__last_DB_ids)),
143  __last_nonDB_countings(std::move(from.__last_nonDB_countings), alloc),
144  __last_nonDB_ids(std::move(from.__last_nonDB_ids)),
145  __max_nb_threads(from.__max_nb_threads),
146  __min_nb_rows_per_thread(from.__min_nb_rows_per_thread) {
147  GUM_CONS_MOV(RecordCounter);
148  }
149 
150 
152  template < template < typename > class ALLOC >
153  RecordCounter< ALLOC >::RecordCounter(RecordCounter< ALLOC >&& from) :
154  RecordCounter< ALLOC >(std::move(from), from.getAllocator()) {}
155 
156 
158  template < template < typename > class ALLOC >
159  RecordCounter< ALLOC >* RecordCounter< ALLOC >::clone(
160  const typename RecordCounter< ALLOC >::allocator_type& alloc) const {
161  ALLOC< RecordCounter< ALLOC > > allocator(alloc);
162  RecordCounter< ALLOC >* new_counter = allocator.allocate(1);
163  try {
164  allocator.construct(new_counter, *this, alloc);
165  } catch (...) {
166  allocator.deallocate(new_counter, 1);
167  throw;
168  }
169 
170  return new_counter;
171  }
172 
173 
175  template < template < typename > class ALLOC >
176  RecordCounter< ALLOC >* RecordCounter< ALLOC >::clone() const {
177  return clone(this->getAllocator());
178  }
179 
180 
182  template < template < typename > class ALLOC >
184  GUM_DESTRUCTOR(RecordCounter);
185  }
186 
187 
189  template < template < typename > class ALLOC >
190  RecordCounter< ALLOC >& RecordCounter< ALLOC >::
191  operator=(const RecordCounter< ALLOC >& from) {
192  if (this != &from) {
193  __parsers = from.__parsers;
194  __ranges = from.__ranges;
195  __thread_ranges = from.__thread_ranges;
196  __nodeId2columns = from.__nodeId2columns;
197  __last_DB_countings = from.__last_DB_countings;
198  __last_DB_ids = from.__last_DB_ids;
199  __last_nonDB_countings = from.__last_nonDB_countings;
200  __last_nonDB_ids = from.__last_nonDB_ids;
201  __max_nb_threads = from.__max_nb_threads;
202  __min_nb_rows_per_thread = from.__min_nb_rows_per_thread;
203  }
204  return *this;
205  }
206 
207 
209  template < template < typename > class ALLOC >
210  RecordCounter< ALLOC >& RecordCounter< ALLOC >::
211  operator=(RecordCounter< ALLOC >&& from) {
212  if (this != &from) {
213  __parsers = std::move(from.__parsers);
214  __ranges = std::move(from.__ranges);
215  __thread_ranges = std::move(from.__thread_ranges);
216  __nodeId2columns = std::move(from.__nodeId2columns);
217  __last_DB_countings = std::move(from.__last_DB_countings);
218  __last_DB_ids = std::move(from.__last_DB_ids);
219  __last_nonDB_countings = std::move(from.__last_nonDB_countings);
220  __last_nonDB_ids = std::move(from.__last_nonDB_ids);
221  __max_nb_threads = from.__max_nb_threads;
222  __min_nb_rows_per_thread = from.__min_nb_rows_per_thread;
223  }
224  return *this;
225  }
226 
227 
229  template < template < typename > class ALLOC >
231  __last_DB_countings.clear();
232  __last_DB_ids.clear();
233  __last_nonDB_countings.clear();
234  __last_nonDB_ids.clear();
235  }
236 
237 
239  template < template < typename > class ALLOC >
240  void RecordCounter< ALLOC >::setMaxNbThreads(const std::size_t nb) const {
241  if (nb == std::size_t(0) || !isOMP())
242  __max_nb_threads = std::size_t(1);
243  else
244  __max_nb_threads = nb;
245  }
246 
247 
249  template < template < typename > class ALLOC >
250  INLINE std::size_t RecordCounter< ALLOC >::nbThreads() const {
251  return __max_nb_threads;
252  }
253 
254 
255  // changes the number min of rows a thread should process in a
256  // multithreading context
257  template < template < typename > class ALLOC >
258  void
259  RecordCounter< ALLOC >::setMinNbRowsPerThread(const std::size_t nb) const {
260  if (nb == std::size_t(0))
261  __min_nb_rows_per_thread = std::size_t(1);
262  else
263  __min_nb_rows_per_thread = nb;
264  }
265 
266 
268  template < template < typename > class ALLOC >
269  INLINE std::size_t RecordCounter< ALLOC >::minNbRowsPerThread() const {
270  return __min_nb_rows_per_thread;
271  }
272 
273 
275  template < template < typename > class ALLOC >
276  void RecordCounter< ALLOC >::__raiseCheckException(
277  const std::vector< std::string, ALLOC< std::string > >& bad_vars) const {
278  // generate the exception
279  std::stringstream msg;
280  msg << "Counts cannot be performed on continuous variables. ";
281  msg << "Unfortunately the following variable";
282  if (bad_vars.size() == 1)
283  msg << " is continuous: " << bad_vars[0];
284  else {
285  msg << "s are continuous: ";
286  bool deja = false;
287  for (const auto& name : bad_vars) {
288  if (deja)
289  msg << ", ";
290  else
291  deja = true;
292  msg << name;
293  }
294  }
295  GUM_ERROR(TypeError, msg.str());
296  }
297 
298 
300  template < template < typename > class ALLOC >
301  void RecordCounter< ALLOC >::__checkDiscreteVariables(
302  const IdSet< ALLOC >& ids) const {
303  const std::size_t size = ids.size();
304  const DatabaseTable< ALLOC >& database = __parsers[0].data.database();
305 
306  if (__nodeId2columns.empty()) {
307  // check all the ids
308  for (std::size_t i = std::size_t(0); i < size; ++i) {
309  if (database.variable(i).varType() == VarType::Continuous) {
310  // here, var i does not correspond to a discrete variable.
311  // we check whether there are other non discrete variables, so that
312  // we can generate an exception mentioning all these variables
313  std::vector< std::string, ALLOC< std::string > > bad_vars{
314  database.variable(i).name()};
315  for (++i; i < size; ++i) {
316  if (database.variable(i).varType() == VarType::Continuous)
317  bad_vars.push_back(database.variable(i).name());
318  }
319  __raiseCheckException(bad_vars);
320  }
321  }
322  } else {
323  // check all the ids
324  for (std::size_t i = std::size_t(0); i < size; ++i) {
325  // get the position of the variable in the database
326  std::size_t pos = __nodeId2columns.second(ids[i]);
327 
328  if (database.variable(pos).varType() == VarType::Continuous) {
329  // here, id does not correspond to a discrete variable.
330  // we check whether there are other non discrete variables, so that
331  // we can generate an exception mentioning all these variables
332  std::vector< std::string, ALLOC< std::string > > bad_vars{
333  database.variable(pos).name()};
334  for (++i; i < size; ++i) {
335  pos = __nodeId2columns.second(ids[i]);
336  if (database.variable(pos).varType() == VarType::Continuous)
337  bad_vars.push_back(database.variable(pos).name());
338  }
339  __raiseCheckException(bad_vars);
340  }
341  }
342  }
343  }
344 
345 
347  template < template < typename > class ALLOC >
348  INLINE const Bijection< NodeId, std::size_t, ALLOC< std::size_t > >&
350  return __nodeId2columns;
351  }
352 
353 
355  template < template < typename > class ALLOC >
356  const DatabaseTable< ALLOC >& RecordCounter< ALLOC >::database() const {
357  return __parsers[0].data.database();
358  }
359 
360 
362  template < template < typename > class ALLOC >
363  INLINE const std::vector< double, ALLOC< double > >&
364  RecordCounter< ALLOC >::counts(const IdSet< ALLOC >& ids,
365  const bool check_discrete_vars) {
366  // if the idset is empty, return an empty vector
367  if (ids.empty()) {
368  __last_nonDB_ids.clear();
369  __last_nonDB_countings.clear();
370  return __last_nonDB_countings;
371  }
372 
373  // check whether we can extract the vector we wish to return from
374  // some already computed counting vector
375  if (__last_nonDB_ids.contains(ids))
376  return __extractFromCountings(
377  ids, __last_nonDB_ids, __last_nonDB_countings);
378  else if (__last_DB_ids.contains(ids))
379  return __extractFromCountings(ids, __last_DB_ids, __last_DB_countings);
380  else {
381  if (check_discrete_vars) __checkDiscreteVariables(ids);
382  return __countFromDatabase(ids);
383  }
384  }
385 
386 
387  // returns a mapping from the nodes ids to the columns of the database
388  // for a given sequence of ids
389  template < template < typename > class ALLOC >
390  HashTable< NodeId, std::size_t > RecordCounter< ALLOC >::__getNodeIds2Columns(
391  const IdSet< ALLOC >& ids) const {
392  HashTable< NodeId, std::size_t > res(ids.size());
393  if (__nodeId2columns.empty()) {
394  for (const auto id : ids) {
395  res.insert(id, std::size_t(id));
396  }
397  } else {
398  for (const auto id : ids) {
399  res.insert(id, __nodeId2columns.second(id));
400  }
401  }
402  return res;
403  }
404 
405 
407  template < template < typename > class ALLOC >
408  INLINE std::vector< double, ALLOC< double > >&
409  RecordCounter< ALLOC >::__extractFromCountings(
410  const IdSet< ALLOC >& subset_ids,
411  const IdSet< ALLOC >& superset_ids,
412  const std::vector< double, ALLOC< double > >& superset_vect) {
413  // get a mapping between the node Ids and their columns in the database.
414  // This should be stored into __nodeId2columns, except if the latter is
415  // empty, in which case there is an identity mapping
416  const auto nodeId2columns = __getNodeIds2Columns(superset_ids);
417 
418  // we first determine the size of the output vector, the domain of
419  // each of its variables and their offsets in the output vector
420  const auto& database = __parsers[0].data.database();
421  std::size_t result_vect_size = std::size_t(1);
422  for (const auto id : subset_ids) {
423  result_vect_size *= database.domainSize(nodeId2columns[id]);
424  }
425 
426  // we create the output vector
427  const std::size_t subset_ids_size = std::size_t(subset_ids.size());
428  std::vector< double, ALLOC< double > > result_vect(result_vect_size, 0.0);
429 
430 
431  // check if the subset_ids is the beginning of the sequence of superset_ids
432  // if this is the case, then we can outer loop over the variables not in
433  // subset_ids and, for each iteration of this loop add a vector of size
434  // result_size to result_vect
435  bool subset_begin = true;
436  for (std::size_t i = 0; i < subset_ids_size; ++i) {
437  if (superset_ids.pos(subset_ids[i]) != i) {
438  subset_begin = false;
439  break;
440  }
441  }
442 
443  if (subset_begin) {
444  const std::size_t superset_vect_size = superset_vect.size();
445  std::size_t i = std::size_t(0);
446  while (i < superset_vect_size) {
447  for (std::size_t j = std::size_t(0); j < result_vect_size; ++j, ++i) {
448  result_vect[j] += superset_vect[i];
449  }
450  }
451 
452  // save the subset_ids and the result vector
453  try {
454  __last_nonDB_ids = subset_ids;
455  __last_nonDB_countings = std::move(result_vect);
456  return __last_nonDB_countings;
457  } catch (...) {
458  __last_nonDB_ids.clear();
459  __last_nonDB_countings.clear();
460  throw;
461  }
462  }
463 
464 
465  // check if subset_ids is the end of the sequence of superset_ids.
466  // In this case, as above, there are two simple loops to perform the
467  // countings
468  bool subset_end = true;
469  const std::size_t superset_ids_size = std::size_t(superset_ids.size());
470  for (std::size_t i = 0; i < subset_ids_size; ++i) {
471  if (superset_ids.pos(subset_ids[i])
472  != i + superset_ids_size - subset_ids_size) {
473  subset_end = false;
474  break;
475  }
476  }
477 
478  if (subset_end) {
479  // determine the size of the vector corresponding to the variables
480  // not belonging to subset_ids
481  std::size_t vect_not_subset_size = std::size_t(1);
482  for (std::size_t i = std::size_t(0);
483  i < superset_ids_size - subset_ids_size;
484  ++i)
485  vect_not_subset_size *=
486  database.domainSize(nodeId2columns[superset_ids[i]]);
487 
488  // perform the two loops
489  std::size_t i = std::size_t(0);
490  for (std::size_t j = std::size_t(0); j < result_vect_size; ++j) {
491  for (std::size_t k = std::size_t(0); k < vect_not_subset_size;
492  ++k, ++i) {
493  result_vect[j] += superset_vect[i];
494  }
495  }
496 
497  // save the subset_ids and the result vector
498  try {
499  __last_nonDB_ids = subset_ids;
500  __last_nonDB_countings = std::move(result_vect);
501  return __last_nonDB_countings;
502  } catch (...) {
503  __last_nonDB_ids.clear();
504  __last_nonDB_countings.clear();
505  throw;
506  }
507  }
508 
509 
510  // here subset_ids is a subset of superset_ids neither prefixing nor
511  // postfixing it. So the computation is somewhat more complicated.
512 
513  // We will parse the superset_vect sequentially (using ++ operator).
514  // Sometimes, we will need to change the offset of the cell of result_vect
515  // that will be affected, sometimes not. Vector before_incr will indicate
516  // whether we need to change the offset (value = 0) or not (value different
517  // from 0). Vectors result_domain will indicate how this offset should be
518  // computed. Here is an example of the values of these vectors. Assume that
519  // superset_ids = <A,B,C,D,E> and subset_ids = <A,D,C>. Then, the three
520  // vectors before_incr, result_domain and result_offset are indexed w.r.t.
521  // A,C,D, i.e., w.r.t. to the variables in subset_ids but order w.r.t.
522  // superset_ids (this is convenient as we will parse superset_vect
523  // sequentially. For a variable or a set of variables X, let M_X denote the
524  // domain size of X. Then the contents of the three vectors are as follows:
525  // before_incr = {0, M_B, 0} (this means that whenever we iterate over B's
526  // values, the offset in result_vect does not change)
527  // result_domain = { M_A, M_C, M_D } (i.e., the domain sizes of the variables
528  // in subset_ids, order w.r.t. superset_ids)
529  // result_offset = { 1, M_A*M_D, M_A } (this corresponds to the offsets
530  // in result_vect of variables A, C and D)
531  // Vector superset_order = { 0, 2, 1} : this is a map from the indices of
532  // the variables in subset_ids to the indices of these variables in the
533  // three vectors described above. For instance, the "2" means that variable
534  // D (which is at index 1 in subset_ids) is located at index 2 in vector
535  // before_incr
536  std::vector< std::size_t > before_incr(subset_ids_size);
537  std::vector< std::size_t > result_domain(subset_ids_size);
538  std::vector< std::size_t > result_offset(subset_ids_size);
539  {
540  std::size_t result_domain_size = std::size_t(1);
541  std::size_t tmp_before_incr = std::size_t(1);
542  std::vector< std::size_t > superset_order(subset_ids_size);
543 
544  for (std::size_t h = std::size_t(0), j = std::size_t(0);
545  j < subset_ids_size;
546  ++h) {
547  if (subset_ids.exists(superset_ids[h])) {
548  before_incr[j] = tmp_before_incr - 1;
549  superset_order[subset_ids.pos(superset_ids[h])] = j;
550  tmp_before_incr = 1;
551  ++j;
552  } else {
553  tmp_before_incr *=
554  database.domainSize(nodeId2columns[superset_ids[h]]);
555  }
556  }
557 
558  // compute the offsets in the order of the superset_ids
559  for (std::size_t i = 0; i < subset_ids.size(); ++i) {
560  const std::size_t domain_size =
561  database.domainSize(nodeId2columns[subset_ids[i]]);
562  const std::size_t j = superset_order[i];
563  result_domain[j] = domain_size;
564  result_offset[j] = result_domain_size;
565  result_domain_size *= domain_size;
566  }
567  }
568 
569  std::vector< std::size_t > result_value(result_domain);
570  std::vector< std::size_t > current_incr(before_incr);
571  std::vector< std::size_t > result_down(result_offset);
572 
573  for (std::size_t j = std::size_t(0); j < result_down.size(); ++j) {
574  result_down[j] *= (result_domain[j] - 1);
575  }
576 
577  // now we can loop over the superset_vect to fill result_vect
578  const std::size_t superset_vect_size = superset_vect.size();
579  std::size_t the_result_offset = std::size_t(0);
580  for (std::size_t h = std::size_t(0); h < superset_vect_size; ++h) {
581  result_vect[the_result_offset] += superset_vect[h];
582 
583  // update the offset of result_vect
584  for (std::size_t k = 0; k < current_incr.size(); ++k) {
585  // check if we need modify result_offset
586  if (current_incr[k]) {
587  --current_incr[k];
588  break;
589  }
590 
591  current_incr[k] = before_incr[k];
592 
593  // here we shall modify result_offset
594  --result_value[k];
595 
596  if (result_value[k]) {
597  the_result_offset += result_offset[k];
598  break;
599  }
600 
601  result_value[k] = result_domain[k];
602  the_result_offset -= result_down[k];
603  }
604  }
605 
606  // save the subset_ids and the result vector
607  try {
608  __last_nonDB_ids = subset_ids;
609  __last_nonDB_countings = std::move(result_vect);
610  return __last_nonDB_countings;
611  } catch (...) {
612  __last_nonDB_ids.clear();
613  __last_nonDB_countings.clear();
614  throw;
615  }
616  }
617 
618 
620  template < template < typename > class ALLOC >
621  std::vector< double, ALLOC< double > >&
622  RecordCounter< ALLOC >::__countFromDatabase(const IdSet< ALLOC >& ids) {
623  // if the ids vector is empty or the database is empty, return an
624  // empty vector
625  const auto& database = __parsers[0].data.database();
626  if (ids.empty() || database.empty() || __thread_ranges.empty()) {
627  __last_nonDB_countings.clear();
628  __last_nonDB_ids.clear();
629  return __last_nonDB_countings;
630  }
631 
632  // we translate the ids into their corresponding columns in the
633  // DatabaseTable
634  const auto nodeId2columns = __getNodeIds2Columns(ids);
635 
636  // we first determine the size of the counting vector, the domain of
637  // each of its variables and their offsets in the output vector
638  const std::size_t ids_size = ids.size();
639  std::size_t counting_vect_size = std::size_t(1);
640  std::vector< std::size_t, ALLOC< std::size_t > > domain_sizes(ids_size);
641  std::vector< std::pair< std::size_t, std::size_t >,
642  ALLOC< std::pair< std::size_t, std::size_t > > >
643  cols_offsets(ids_size);
644  {
645  std::size_t i = std::size_t(0);
646  for (const auto id : ids) {
647  const std::size_t domain_size = database.domainSize(nodeId2columns[id]);
648  domain_sizes[i] = domain_size;
649  cols_offsets[i].first = nodeId2columns[id];
650  cols_offsets[i].second = counting_vect_size;
651  counting_vect_size *= domain_size;
652  ++i;
653  }
654  }
655 
656  // we sort the columns and offsets by increasing column index. This
657  // may speed up threaded countings by improving the cacheline hits
658  std::sort(cols_offsets.begin(),
659  cols_offsets.end(),
660  [](const std::pair< std::size_t, std::size_t >& a,
661  const std::pair< std::size_t, std::size_t >& b) -> bool {
662  return a.first < b.first;
663  });
664 
665  // create parsers if needed
666  const std::size_t nb_ranges = __thread_ranges.size();
667  const std::size_t nb_threads =
668  nb_ranges <= __max_nb_threads ? nb_ranges : __max_nb_threads;
669  while (__parsers.size() < nb_threads) {
670  ThreadData< DBRowGeneratorParser< ALLOC > > new_parser(__parsers[0]);
671  __parsers.push_back(std::move(new_parser));
672  }
673 
674  // set the columns of interest for each parser. This specifies to the
675  // parser which columns are used for the countings. This is important
676  // for parsers like the EM parser that complete unobserved variables.
677  std::vector< std::size_t, ALLOC< std::size_t > > cols_of_interest(ids_size);
678  for (std::size_t i = std::size_t(0); i < ids_size; ++i) {
679  cols_of_interest[i] = cols_offsets[i].first;
680  }
681  for (auto& parser : __parsers) {
682  parser.data.setColumnsOfInterest(cols_of_interest);
683  }
684 
685  // allocate all the counting vectors, including that which will add
686  // all the results provided by the threads. We initialize once and
687  // for all these vectors with zeroes
688  std::vector< double, ALLOC< double > > counting_vect(counting_vect_size,
689  0.0);
690  std::vector< ThreadData< std::vector< double, ALLOC< double > > >,
691  ALLOC< ThreadData< std::vector< double, ALLOC< double > > > > >
692  thread_countings(
693  nb_threads,
694  ThreadData< std::vector< double, ALLOC< double > > >(counting_vect));
695 
696  // launch the threads
697  // here we use openMP for launching the threads because, experimentally,
698  // it seems to provide results that are twice as fast as the results
699  // with the std::thread
700  for (std::size_t i = std::size_t(0); i < nb_ranges; i += nb_threads) {
701 # pragma omp parallel num_threads(int(nb_threads))
702  {
703  // get the number of the thread
704  const std::size_t this_thread = getThreadNumber();
705  if (this_thread + i < nb_ranges) {
706  DBRowGeneratorParser< ALLOC >& parser = __parsers[this_thread].data;
707  parser.setRange(__thread_ranges[this_thread + i].first,
708  __thread_ranges[this_thread + i].second);
709  std::vector< double, ALLOC< double > >& countings =
710  thread_countings[this_thread].data;
711 
712  // parse the database
713  try {
714  while (parser.hasRows()) {
715  // get the observed rows
716  const DBRow< DBTranslatedValue >& row = parser.row();
717 
718  // fill the counts for the current row
719  std::size_t offset = std::size_t(0);
720  for (std::size_t i = std::size_t(0); i < ids_size; ++i) {
721  offset +=
722  row[cols_offsets[i].first].discr_val * cols_offsets[i].second;
723  }
724 
725  countings[offset] += row.weight();
726  }
727  } catch (NotFound&) {} // this exception is raised by the row filter
728  // if the row generators create no output row
729  // from the last rows of the database
730  }
731  }
732  }
733 
734 
735  // add the counts to counting_vect
736  for (std::size_t k = std::size_t(0); k < nb_threads; ++k) {
737  const auto& thread_counting = thread_countings[k].data;
738  for (std::size_t r = std::size_t(0); r < counting_vect_size; ++r) {
739  counting_vect[r] += thread_counting[r];
740  }
741  }
742 
743  // save the final results
744  __last_DB_ids = ids;
745  __last_DB_countings = std::move(counting_vect);
746 
747  return __last_DB_countings;
748  }
749 
750 
752  template < template < typename > class ALLOC >
753  void RecordCounter< ALLOC >::__threadedCount(
754  const std::size_t begin,
755  const std::size_t end,
756  DBRowGeneratorParser< ALLOC >& parser,
757  const std::vector< std::pair< std::size_t, std::size_t >,
758  ALLOC< std::pair< std::size_t, std::size_t > > >&
759  cols_offsets,
760  std::vector< double, ALLOC< double > >& countings) {
761  parser.setRange(begin, end);
762 
763  try {
764  const std::size_t nb_columns = cols_offsets.size();
765  while (parser.hasRows()) {
766  // get the observed filtered rows
767  const DBRow< DBTranslatedValue >& row = parser.row();
768 
769  // fill the counts for the current row
770  std::size_t offset = std::size_t(0);
771  for (std::size_t i = std::size_t(0); i < nb_columns; ++i) {
772  offset +=
773  row[cols_offsets[i].first].discr_val * cols_offsets[i].second;
774  }
775 
776  countings[offset] += row.weight();
777  }
778  } catch (NotFound&) {} // this exception is raised by the row filter if the
779  // row generators create no output row from the last
780  // rows of the database
781  }
782 
783 
785  template < template < typename > class ALLOC >
786  template < template < typename > class XALLOC >
787  void RecordCounter< ALLOC >::__checkRanges(
788  const std::vector< std::pair< std::size_t, std::size_t >,
789  XALLOC< std::pair< std::size_t, std::size_t > > >&
790  new_ranges) const {
791  const std::size_t dbsize = __parsers[0].data.database().nbRows();
792  std::vector< std::pair< std::size_t, std::size_t >,
793  ALLOC< std::pair< std::size_t, std::size_t > > >
794  incorrect_ranges;
795  for (const auto& range : new_ranges) {
796  if ((range.first >= range.second) || (range.second > dbsize)) {
797  incorrect_ranges.push_back(range);
798  }
799  }
800  if (!incorrect_ranges.empty()) {
801  std::stringstream str;
802  str << "It is impossible to set the ranges because the following one";
803  if (incorrect_ranges.size() > 1)
804  str << "s are incorrect: ";
805  else
806  str << " is incorrect: ";
807  bool deja = false;
808  for (const auto& range : incorrect_ranges) {
809  if (deja)
810  str << ", ";
811  else
812  deja = true;
813  str << '[' << range.first << ';' << range.second << ')';
814  }
815 
816  GUM_ERROR(OutOfBounds, str.str());
817  }
818  }
819 
820 
822  template < template < typename > class ALLOC >
823  void RecordCounter< ALLOC >::__dispatchRangesToThreads() {
824  __thread_ranges.clear();
825 
826  // ensure that __ranges contains the ranges asked by the user
827  bool add_range = false;
828  if (__ranges.empty()) {
829  const auto& database = __parsers[0].data.database();
830  __ranges.push_back(std::pair< std::size_t, std::size_t >(
831  std::size_t(0), database.nbRows()));
832  add_range = true;
833  }
834 
835  // dispatch the ranges
836  for (const auto& range : __ranges) {
837  if (range.second > range.first) {
838  const std::size_t range_size = range.second - range.first;
839  std::size_t nb_threads = range_size / __min_nb_rows_per_thread;
840  if (nb_threads < 1)
841  nb_threads = 1;
842  else if (nb_threads > __max_nb_threads)
843  nb_threads = __max_nb_threads;
844  std::size_t nb_rows_par_thread = range_size / nb_threads;
845  std::size_t rest_rows = range_size - nb_rows_par_thread * nb_threads;
846 
847  std::size_t begin_index = range.first;
848  for (std::size_t i = std::size_t(0); i < nb_threads; ++i) {
849  std::size_t end_index = begin_index + nb_rows_par_thread;
850  if (rest_rows != std::size_t(0)) {
851  ++end_index;
852  --rest_rows;
853  }
854  __thread_ranges.push_back(
855  std::pair< std::size_t, std::size_t >(begin_index, end_index));
856  begin_index = end_index;
857  }
858  }
859  }
860  if (add_range) __ranges.clear();
861 
862  // sort ranges by decreasing range size, so that if the number of
863  // ranges exceeds the number of threads allowed, we start a first round of
864  // threads with the highest range, then another round with lower ranges,
865  // and so on until all the ranges have been processed
866  std::sort(__thread_ranges.begin(),
867  __thread_ranges.end(),
868  [](const std::pair< std::size_t, std::size_t >& a,
869  const std::pair< std::size_t, std::size_t >& b) -> bool {
870  return (a.second - a.first) > (b.second - b.first);
871  });
872  }
873 
874 
876  template < template < typename > class ALLOC >
877  template < template < typename > class XALLOC >
879  const std::vector< std::pair< std::size_t, std::size_t >,
880  XALLOC< std::pair< std::size_t, std::size_t > > >&
881  new_ranges) {
882  // first, we check that all ranges are within the database's bounds
883  __checkRanges(new_ranges);
884 
885  // since the ranges are OK, save them and clear the counting caches
886  const std::size_t new_size = new_ranges.size();
887  std::vector< std::pair< std::size_t, std::size_t >,
888  ALLOC< std::pair< std::size_t, std::size_t > > >
889  ranges(new_size);
890  for (std::size_t i = std::size_t(0); i < new_size; ++i) {
891  ranges[i].first = new_ranges[i].first;
892  ranges[i].second = new_ranges[i].second;
893  }
894 
895  clear();
896  __ranges = std::move(ranges);
897 
898  // dispatch the ranges to the threads
899  __dispatchRangesToThreads();
900  }
901 
902 
904  template < template < typename > class ALLOC >
906  if (__ranges.empty()) return;
907  clear();
908  __ranges.clear();
909  __dispatchRangesToThreads();
910  }
911 
912 
914  template < template < typename > class ALLOC >
915  INLINE const std::vector< std::pair< std::size_t, std::size_t >,
916  ALLOC< std::pair< std::size_t, std::size_t > > >&
918  return __ranges;
919  }
920 
921 
923  template < template < typename > class ALLOC >
924  template < typename GUM_SCALAR >
925  INLINE void
926  RecordCounter< ALLOC >::setBayesNet(const BayesNet< GUM_SCALAR >& new_bn) {
927  // remove the caches
928  clear();
929 
930  // assign the new BN
931  for (auto& xparser : __parsers) {
932  xparser.data.setBayesNet(new_bn);
933  }
934  }
935 
936  } /* namespace learning */
937 
938 } /* namespace gum */
939 
940 #endif /* DOXYGEN_SHOULD_SKIP_THIS */
bool isOMP()
Is OMP active ?
virtual RecordCounter< ALLOC > * clone() const
virtual copy constructor
unsigned int getThreadNumber()
Get the calling thread id.
const Bijection< NodeId, std::size_t, ALLOC< std::size_t > > & nodeId2Columns() const
returns the mapping from ids to column positions in the database
STL namespace.
void clearRanges()
reset the ranges to the one range corresponding to the whole database
std::size_t nbThreads() const
returns the number of threads used to parse the database
allocator_type getAllocator() const
returns the allocator used
Copyright 2005-2019 Pierre-Henri WUILLEMIN et Christophe GONZALES (LIP6) {prenom.nom}_at_lip6.fr.
Definition: agrum.h:25
void setMaxNbThreads(const std::size_t nb) const
changes the max number of threads used to parse the database
const std::vector< std::pair< std::size_t, std::size_t >, ALLOC< std::pair< std::size_t, std::size_t > > > & ranges() const
returns the current ranges
void clear()
clears all the last database-parsed countings from memory
std::size_t minNbRowsPerThread() const
returns the minimum of rows that each thread should process
Copyright 2005-2019 Pierre-Henri WUILLEMIN et Christophe GONZALES (LIP6) {prenom.nom}_at_lip6.fr.
const std::vector< double, ALLOC< double > > & counts(const IdSet< ALLOC > &ids, const bool check_discrete_vars=false)
returns the counts over all the variables in an IdSet
ALLOC< NodeId > allocator_type
type for the allocators passed in arguments of methods
void setMinNbRowsPerThread(const std::size_t nb) const
changes the number min of rows a thread should process in a multithreading context ...
virtual ~RecordCounter()
destructor
void setRanges(const std::vector< std::pair< std::size_t, std::size_t >, XALLOC< std::pair< std::size_t, std::size_t > > > &new_ranges)
sets new ranges to perform the countings
RecordCounter< ALLOC > & operator=(const RecordCounter< ALLOC > &from)
copy operator
RecordCounter(const DBRowGeneratorParser< ALLOC > &parser, const std::vector< std::pair< std::size_t, std::size_t >, ALLOC< std::pair< std::size_t, std::size_t > > > &ranges, const Bijection< NodeId, std::size_t, ALLOC< std::size_t > > &nodeId2columns=Bijection< NodeId, std::size_t, ALLOC< std::size_t > >(), const allocator_type &alloc=allocator_type())
default constructor
Size NodeId
Type for node ids.
Definition: graphElements.h:98
#define GUM_ERROR(type, msg)
Definition: exceptions.h:55
void setBayesNet(const BayesNet< GUM_SCALAR > &new_bn)
assign a new Bayes net to all the counter&#39;s generators depending on a BN
const DatabaseTable< ALLOC > & database() const
returns the database on which we perform the counts