29 #ifndef DOXYGEN_SHOULD_SKIP_THIS 37 template <
template <
typename >
class ALLOC >
40 return __parsers.get_allocator();
45 template <
template <
typename >
class ALLOC >
47 const DBRowGeneratorParser< ALLOC >& parser,
48 const std::vector< std::pair< std::size_t, std::size_t >,
49 ALLOC< std::pair< std::size_t, std::size_t > > >& ranges,
50 const Bijection<
NodeId, std::size_t, ALLOC< std::size_t > >&
54 __ranges(alloc), __nodeId2columns(nodeId2columns),
55 __last_DB_countings(alloc), __last_DB_ids(alloc),
56 __last_nonDB_countings(alloc), __last_nonDB_ids(alloc) {
58 const std::size_t db_nb_cols = parser.database().nbVariables();
59 for (
auto iter = nodeId2columns.cbegin(); iter != nodeId2columns.cend();
61 if (iter.second() >= db_nb_cols) {
63 "the mapping between ids and database columns " 64 <<
"is incorrect because Column " << iter.second()
65 <<
" does not belong to the database.");
70 if (__max_nb_threads < std::size_t(1)) __max_nb_threads = std::size_t(1);
71 __parsers.reserve(__max_nb_threads);
72 for (std::size_t i = std::size_t(0); i < __max_nb_threads; ++i)
73 __parsers.push_back(parser);
78 __ranges.reserve(
ranges.size());
79 for (
const auto& range :
ranges)
80 __ranges.push_back(range);
83 __dispatchRangesToThreads();
90 template <
template <
typename >
class ALLOC >
92 const DBRowGeneratorParser< ALLOC >& parser,
93 const Bijection<
NodeId, std::size_t, ALLOC< std::size_t > >&
99 ALLOC<
std::pair<
std::size_t,
std::size_t > > >(),
105 template <
template <
typename >
class ALLOC >
107 const RecordCounter< ALLOC >& from,
109 __parsers(from.__parsers, alloc),
110 __ranges(from.__ranges, alloc),
111 __thread_ranges(from.__thread_ranges, alloc),
112 __nodeId2columns(from.__nodeId2columns),
113 __last_DB_countings(from.__last_DB_countings, alloc),
114 __last_DB_ids(from.__last_DB_ids),
115 __last_nonDB_countings(from.__last_nonDB_countings, alloc),
116 __last_nonDB_ids(from.__last_nonDB_ids),
117 __max_nb_threads(from.__max_nb_threads),
118 __min_nb_rows_per_thread(from.__min_nb_rows_per_thread) {
124 template <
template <
typename >
class ALLOC >
130 template <
template <
typename >
class ALLOC >
132 RecordCounter< ALLOC >&& from,
134 __parsers(
std::move(from.__parsers), alloc),
135 __ranges(
std::move(from.__ranges), alloc),
136 __thread_ranges(
std::move(from.__thread_ranges), alloc),
137 __nodeId2columns(
std::move(from.__nodeId2columns)),
138 __last_DB_countings(
std::move(from.__last_DB_countings), alloc),
139 __last_DB_ids(
std::move(from.__last_DB_ids)),
140 __last_nonDB_countings(
std::move(from.__last_nonDB_countings), alloc),
141 __last_nonDB_ids(
std::move(from.__last_nonDB_ids)),
142 __max_nb_threads(from.__max_nb_threads),
143 __min_nb_rows_per_thread(from.__min_nb_rows_per_thread) {
149 template <
template <
typename >
class ALLOC >
155 template <
template <
typename >
class ALLOC >
158 ALLOC< RecordCounter< ALLOC > > allocator(alloc);
159 RecordCounter< ALLOC >* new_counter = allocator.allocate(1);
161 allocator.construct(new_counter, *
this, alloc);
163 allocator.deallocate(new_counter, 1);
172 template <
template <
typename >
class ALLOC >
179 template <
template <
typename >
class ALLOC >
186 template <
template <
typename >
class ALLOC >
188 operator=(
const RecordCounter< ALLOC >& from) {
190 __parsers = from.__parsers;
191 __ranges = from.__ranges;
192 __thread_ranges = from.__thread_ranges;
193 __nodeId2columns = from.__nodeId2columns;
194 __last_DB_countings = from.__last_DB_countings;
195 __last_DB_ids = from.__last_DB_ids;
196 __last_nonDB_countings = from.__last_nonDB_countings;
197 __last_nonDB_ids = from.__last_nonDB_ids;
198 __max_nb_threads = from.__max_nb_threads;
199 __min_nb_rows_per_thread = from.__min_nb_rows_per_thread;
206 template <
template <
typename >
class ALLOC >
208 operator=(RecordCounter< ALLOC >&& from) {
210 __parsers = std::move(from.__parsers);
211 __ranges = std::move(from.__ranges);
212 __thread_ranges = std::move(from.__thread_ranges);
213 __nodeId2columns = std::move(from.__nodeId2columns);
214 __last_DB_countings = std::move(from.__last_DB_countings);
215 __last_DB_ids = std::move(from.__last_DB_ids);
216 __last_nonDB_countings = std::move(from.__last_nonDB_countings);
217 __last_nonDB_ids = std::move(from.__last_nonDB_ids);
218 __max_nb_threads = from.__max_nb_threads;
219 __min_nb_rows_per_thread = from.__min_nb_rows_per_thread;
226 template <
template <
typename >
class ALLOC >
228 __last_DB_countings.clear();
229 __last_DB_ids.clear();
230 __last_nonDB_countings.clear();
231 __last_nonDB_ids.clear();
236 template <
template <
typename >
class ALLOC >
238 if (nb == std::size_t(0) || !
isOMP())
239 __max_nb_threads = std::size_t(1);
241 __max_nb_threads = nb;
246 template <
template <
typename >
class ALLOC >
248 return __max_nb_threads;
254 template <
template <
typename >
class ALLOC >
257 if (nb == std::size_t(0))
258 __min_nb_rows_per_thread = std::size_t(1);
260 __min_nb_rows_per_thread = nb;
265 template <
template <
typename >
class ALLOC >
267 return __min_nb_rows_per_thread;
272 template <
template <
typename >
class ALLOC >
273 void RecordCounter< ALLOC >::__raiseCheckException(
274 const std::vector< std::string, ALLOC< std::string > >& bad_vars)
const {
276 std::stringstream msg;
277 msg <<
"Counts cannot be performed on continuous variables. ";
278 msg <<
"Unfortunately the following variable";
279 if (bad_vars.size() == 1)
280 msg <<
" is continuous: " << bad_vars[0];
282 msg <<
"s are continuous: ";
284 for (
const auto& name : bad_vars) {
297 template <
template <
typename >
class ALLOC >
298 void RecordCounter< ALLOC >::__checkDiscreteVariables(
299 const IdSet< ALLOC >& ids)
const {
300 const std::size_t size = ids.size();
301 const DatabaseTable< ALLOC >&
database = __parsers[0].data.database();
303 if (__nodeId2columns.empty()) {
305 for (std::size_t i = std::size_t(0); i < size; ++i) {
310 std::vector< std::string, ALLOC< std::string > > bad_vars{
311 database.variable(i).name()};
312 for (++i; i < size; ++i) {
314 bad_vars.push_back(database.variable(i).name());
316 __raiseCheckException(bad_vars);
321 for (std::size_t i = std::size_t(0); i < size; ++i) {
323 std::size_t pos = __nodeId2columns.second(ids[i]);
329 std::vector< std::string, ALLOC< std::string > > bad_vars{
330 database.variable(pos).name()};
331 for (++i; i < size; ++i) {
332 pos = __nodeId2columns.second(ids[i]);
334 bad_vars.push_back(database.variable(pos).name());
336 __raiseCheckException(bad_vars);
344 template <
template <
typename >
class ALLOC >
345 INLINE
const Bijection< NodeId, std::size_t, ALLOC< std::size_t > >&
347 return __nodeId2columns;
352 template <
template <
typename >
class ALLOC >
354 return __parsers[0].data.database();
359 template <
template <
typename >
class ALLOC >
360 INLINE
const std::vector< double, ALLOC< double > >&
362 const bool check_discrete_vars) {
365 __last_nonDB_ids.clear();
366 __last_nonDB_countings.clear();
367 return __last_nonDB_countings;
372 if (__last_nonDB_ids.contains(ids))
373 return __extractFromCountings(
374 ids, __last_nonDB_ids, __last_nonDB_countings);
375 else if (__last_DB_ids.contains(ids))
376 return __extractFromCountings(ids, __last_DB_ids, __last_DB_countings);
378 if (check_discrete_vars) __checkDiscreteVariables(ids);
379 return __countFromDatabase(ids);
386 template <
template <
typename >
class ALLOC >
387 HashTable< NodeId, std::size_t > RecordCounter< ALLOC >::__getNodeIds2Columns(
388 const IdSet< ALLOC >& ids)
const {
389 HashTable< NodeId, std::size_t > res(ids.size());
390 if (__nodeId2columns.empty()) {
391 for (
const auto id : ids) {
392 res.insert(
id, std::size_t(
id));
395 for (
const auto id : ids) {
396 res.insert(
id, __nodeId2columns.second(
id));
404 template <
template <
typename >
class ALLOC >
405 INLINE std::vector< double, ALLOC< double > >&
406 RecordCounter< ALLOC >::__extractFromCountings(
407 const IdSet< ALLOC >& subset_ids,
408 const IdSet< ALLOC >& superset_ids,
409 const std::vector<
double, ALLOC< double > >& superset_vect) {
413 const auto nodeId2columns = __getNodeIds2Columns(superset_ids);
417 const auto& database = __parsers[0].data.database();
418 std::size_t result_vect_size = std::size_t(1);
419 for (
const auto id : subset_ids) {
420 result_vect_size *= database.domainSize(nodeId2columns[
id]);
424 const std::size_t subset_ids_size = std::size_t(subset_ids.size());
425 std::vector< double, ALLOC< double > > result_vect(result_vect_size, 0.0);
432 bool subset_begin =
true;
433 for (std::size_t i = 0; i < subset_ids_size; ++i) {
434 if (superset_ids.pos(subset_ids[i]) != i) {
435 subset_begin =
false;
441 const std::size_t superset_vect_size = superset_vect.size();
442 std::size_t i = std::size_t(0);
443 while (i < superset_vect_size) {
444 for (std::size_t j = std::size_t(0); j < result_vect_size; ++j, ++i) {
445 result_vect[j] += superset_vect[i];
451 __last_nonDB_ids = subset_ids;
452 __last_nonDB_countings = std::move(result_vect);
453 return __last_nonDB_countings;
455 __last_nonDB_ids.clear();
456 __last_nonDB_countings.clear();
465 bool subset_end =
true;
466 const std::size_t superset_ids_size = std::size_t(superset_ids.size());
467 for (std::size_t i = 0; i < subset_ids_size; ++i) {
468 if (superset_ids.pos(subset_ids[i])
469 != i + superset_ids_size - subset_ids_size) {
478 std::size_t vect_not_subset_size = std::size_t(1);
479 for (std::size_t i = std::size_t(0);
480 i < superset_ids_size - subset_ids_size;
482 vect_not_subset_size *=
483 database.domainSize(nodeId2columns[superset_ids[i]]);
486 std::size_t i = std::size_t(0);
487 for (std::size_t j = std::size_t(0); j < result_vect_size; ++j) {
488 for (std::size_t k = std::size_t(0); k < vect_not_subset_size;
490 result_vect[j] += superset_vect[i];
496 __last_nonDB_ids = subset_ids;
497 __last_nonDB_countings = std::move(result_vect);
498 return __last_nonDB_countings;
500 __last_nonDB_ids.clear();
501 __last_nonDB_countings.clear();
533 std::vector< std::size_t > before_incr(subset_ids_size);
534 std::vector< std::size_t > result_domain(subset_ids_size);
535 std::vector< std::size_t > result_offset(subset_ids_size);
537 std::size_t result_domain_size = std::size_t(1);
538 std::size_t tmp_before_incr = std::size_t(1);
539 std::vector< std::size_t > superset_order(subset_ids_size);
541 for (std::size_t h = std::size_t(0), j = std::size_t(0);
544 if (subset_ids.exists(superset_ids[h])) {
545 before_incr[j] = tmp_before_incr - 1;
546 superset_order[subset_ids.pos(superset_ids[h])] = j;
551 database.domainSize(nodeId2columns[superset_ids[h]]);
556 for (std::size_t i = 0; i < subset_ids.size(); ++i) {
557 const std::size_t domain_size =
558 database.domainSize(nodeId2columns[subset_ids[i]]);
559 const std::size_t j = superset_order[i];
560 result_domain[j] = domain_size;
561 result_offset[j] = result_domain_size;
562 result_domain_size *= domain_size;
566 std::vector< std::size_t > result_value(result_domain);
567 std::vector< std::size_t > current_incr(before_incr);
568 std::vector< std::size_t > result_down(result_offset);
570 for (std::size_t j = std::size_t(0); j < result_down.size(); ++j) {
571 result_down[j] *= (result_domain[j] - 1);
575 const std::size_t superset_vect_size = superset_vect.size();
576 std::size_t the_result_offset = std::size_t(0);
577 for (std::size_t h = std::size_t(0); h < superset_vect_size; ++h) {
578 result_vect[the_result_offset] += superset_vect[h];
581 for (std::size_t k = 0; k < current_incr.size(); ++k) {
583 if (current_incr[k]) {
588 current_incr[k] = before_incr[k];
593 if (result_value[k]) {
594 the_result_offset += result_offset[k];
598 result_value[k] = result_domain[k];
599 the_result_offset -= result_down[k];
605 __last_nonDB_ids = subset_ids;
606 __last_nonDB_countings = std::move(result_vect);
607 return __last_nonDB_countings;
609 __last_nonDB_ids.clear();
610 __last_nonDB_countings.clear();
617 template <
template <
typename >
class ALLOC >
618 std::vector< double, ALLOC< double > >&
619 RecordCounter< ALLOC >::__countFromDatabase(
const IdSet< ALLOC >& ids) {
622 const auto& database = __parsers[0].data.database();
623 if (ids.empty() || database.empty() || __thread_ranges.empty()) {
624 __last_nonDB_countings.clear();
625 __last_nonDB_ids.clear();
626 return __last_nonDB_countings;
631 const auto nodeId2columns = __getNodeIds2Columns(ids);
635 const std::size_t ids_size = ids.size();
636 std::size_t counting_vect_size = std::size_t(1);
637 std::vector< std::size_t, ALLOC< std::size_t > > domain_sizes(ids_size);
638 std::vector< std::pair< std::size_t, std::size_t >,
639 ALLOC< std::pair< std::size_t, std::size_t > > >
640 cols_offsets(ids_size);
642 std::size_t i = std::size_t(0);
643 for (
const auto id : ids) {
644 const std::size_t domain_size = database.domainSize(nodeId2columns[
id]);
645 domain_sizes[i] = domain_size;
646 cols_offsets[i].first = nodeId2columns[id];
647 cols_offsets[i].second = counting_vect_size;
648 counting_vect_size *= domain_size;
655 std::sort(cols_offsets.begin(),
657 [](
const std::pair< std::size_t, std::size_t >& a,
658 const std::pair< std::size_t, std::size_t >& b) ->
bool {
659 return a.first < b.first;
663 const std::size_t nb_ranges = __thread_ranges.size();
664 const std::size_t nb_threads =
665 nb_ranges <= __max_nb_threads ? nb_ranges : __max_nb_threads;
666 while (__parsers.size() < nb_threads) {
667 ThreadData< DBRowGeneratorParser< ALLOC > > new_parser(__parsers[0]);
668 __parsers.push_back(std::move(new_parser));
674 std::vector< std::size_t, ALLOC< std::size_t > > cols_of_interest(ids_size);
675 for (std::size_t i = std::size_t(0); i < ids_size; ++i) {
676 cols_of_interest[i] = cols_offsets[i].first;
678 for (
auto& parser : __parsers) {
679 parser.data.setColumnsOfInterest(cols_of_interest);
685 std::vector< double, ALLOC< double > > counting_vect(counting_vect_size,
687 std::vector< ThreadData< std::vector< double, ALLOC< double > > >,
688 ALLOC< ThreadData< std::vector< double, ALLOC< double > > > > >
691 ThreadData< std::vector<
double, ALLOC< double > > >(counting_vect));
697 for (std::size_t i = std::size_t(0); i < nb_ranges; i += nb_threads) {
698 # pragma omp parallel num_threads(int(nb_threads)) 702 if (this_thread + i < nb_ranges) {
703 DBRowGeneratorParser< ALLOC >& parser = __parsers[this_thread].data;
704 parser.setRange(__thread_ranges[this_thread + i].first,
705 __thread_ranges[this_thread + i].second);
706 std::vector< double, ALLOC< double > >& countings =
707 thread_countings[this_thread].data;
711 while (parser.hasRows()) {
713 const DBRow< DBTranslatedValue >& row = parser.row();
716 std::size_t offset = std::size_t(0);
717 for (std::size_t i = std::size_t(0); i < ids_size; ++i) {
719 row[cols_offsets[i].first].discr_val * cols_offsets[i].second;
722 countings[offset] += row.weight();
724 }
catch (NotFound&) {}
733 for (std::size_t k = std::size_t(0); k < nb_threads; ++k) {
734 const auto& thread_counting = thread_countings[k].data;
735 for (std::size_t r = std::size_t(0); r < counting_vect_size; ++r) {
736 counting_vect[r] += thread_counting[r];
742 __last_DB_countings = std::move(counting_vect);
744 return __last_DB_countings;
749 template <
template <
typename >
class ALLOC >
750 void RecordCounter< ALLOC >::__threadedCount(
751 const std::size_t begin,
752 const std::size_t end,
753 DBRowGeneratorParser< ALLOC >& parser,
754 const std::vector< std::pair< std::size_t, std::size_t >,
755 ALLOC< std::pair< std::size_t, std::size_t > > >&
757 std::vector<
double, ALLOC< double > >& countings) {
758 parser.setRange(begin, end);
761 const std::size_t nb_columns = cols_offsets.size();
762 while (parser.hasRows()) {
764 const DBRow< DBTranslatedValue >& row = parser.row();
767 std::size_t offset = std::size_t(0);
768 for (std::size_t i = std::size_t(0); i < nb_columns; ++i) {
770 row[cols_offsets[i].first].discr_val * cols_offsets[i].second;
773 countings[offset] += row.weight();
775 }
catch (NotFound&) {}
782 template <
template <
typename >
class ALLOC >
783 template <
template <
typename >
class XALLOC >
784 void RecordCounter< ALLOC >::__checkRanges(
785 const std::vector< std::pair< std::size_t, std::size_t >,
786 XALLOC< std::pair< std::size_t, std::size_t > > >&
788 const std::size_t dbsize = __parsers[0].data.database().nbRows();
789 std::vector< std::pair< std::size_t, std::size_t >,
790 ALLOC< std::pair< std::size_t, std::size_t > > >
792 for (
const auto& range : new_ranges) {
793 if ((range.first >= range.second) || (range.second > dbsize)) {
794 incorrect_ranges.push_back(range);
797 if (!incorrect_ranges.empty()) {
798 std::stringstream str;
799 str <<
"It is impossible to set the ranges because the following one";
800 if (incorrect_ranges.size() > 1)
801 str <<
"s are incorrect: ";
803 str <<
" is incorrect: ";
805 for (
const auto& range : incorrect_ranges) {
810 str <<
'[' << range.first <<
';' << range.second <<
')';
819 template <
template <
typename >
class ALLOC >
820 void RecordCounter< ALLOC >::__dispatchRangesToThreads() {
821 __thread_ranges.clear();
824 bool add_range =
false;
825 if (__ranges.empty()) {
826 const auto& database = __parsers[0].data.database();
827 __ranges.push_back(std::pair< std::size_t, std::size_t >(
828 std::size_t(0), database.nbRows()));
833 for (
const auto& range : __ranges) {
834 if (range.second > range.first) {
835 const std::size_t range_size = range.second - range.first;
836 std::size_t nb_threads = range_size / __min_nb_rows_per_thread;
839 else if (nb_threads > __max_nb_threads)
840 nb_threads = __max_nb_threads;
841 std::size_t nb_rows_par_thread = range_size / nb_threads;
842 std::size_t rest_rows = range_size - nb_rows_par_thread * nb_threads;
844 std::size_t begin_index = range.first;
845 for (std::size_t i = std::size_t(0); i < nb_threads; ++i) {
846 std::size_t end_index = begin_index + nb_rows_par_thread;
847 if (rest_rows != std::size_t(0)) {
851 __thread_ranges.push_back(
852 std::pair< std::size_t, std::size_t >(begin_index, end_index));
853 begin_index = end_index;
857 if (add_range) __ranges.clear();
863 std::sort(__thread_ranges.begin(),
864 __thread_ranges.end(),
865 [](
const std::pair< std::size_t, std::size_t >& a,
866 const std::pair< std::size_t, std::size_t >& b) ->
bool {
867 return (a.second - a.first) > (b.second - b.first);
873 template <
template <
typename >
class ALLOC >
874 template <
template <
typename >
class XALLOC >
876 const std::vector< std::pair< std::size_t, std::size_t >,
877 XALLOC< std::pair< std::size_t, std::size_t > > >&
880 __checkRanges(new_ranges);
883 const std::size_t new_size = new_ranges.size();
884 std::vector< std::pair< std::size_t, std::size_t >,
885 ALLOC< std::pair< std::size_t, std::size_t > > >
887 for (std::size_t i = std::size_t(0); i < new_size; ++i) {
888 ranges[i].first = new_ranges[i].first;
889 ranges[i].second = new_ranges[i].second;
893 __ranges = std::move(ranges);
896 __dispatchRangesToThreads();
901 template <
template <
typename >
class ALLOC >
903 if (__ranges.empty())
return;
906 __dispatchRangesToThreads();
911 template <
template <
typename >
class ALLOC >
912 INLINE
const std::vector< std::pair< std::size_t, std::size_t >,
913 ALLOC< std::pair< std::size_t, std::size_t > > >&
920 template <
template <
typename >
class ALLOC >
921 template <
typename GUM_SCALAR >
928 for (
auto& xparser : __parsers) {
929 xparser.data.setBayesNet(new_bn);
bool isOMP()
Is OMP active ?
virtual RecordCounter< ALLOC > * clone() const
virtual copy constructor
unsigned int getThreadNumber()
Get the calling thread id.
const Bijection< NodeId, std::size_t, ALLOC< std::size_t > > & nodeId2Columns() const
returns the mapping from ids to column positions in the database
void clearRanges()
reset the ranges to the one range corresponding to the whole database
std::size_t nbThreads() const
returns the number of threads used to parse the database
allocator_type getAllocator() const
returns the allocator used
gum is the global namespace for all aGrUM entities
void setMaxNbThreads(const std::size_t nb) const
changes the max number of threads used to parse the database
const std::vector< std::pair< std::size_t, std::size_t >, ALLOC< std::pair< std::size_t, std::size_t > > > & ranges() const
returns the current ranges
void clear()
clears all the last database-parsed countings from memory
std::size_t minNbRowsPerThread() const
returns the minimum of rows that each thread should process
The class that computes countings of observations from the database.
const std::vector< double, ALLOC< double > > & counts(const IdSet< ALLOC > &ids, const bool check_discrete_vars=false)
returns the counts over all the variables in an IdSet
ALLOC< NodeId > allocator_type
type for the allocators passed in arguments of methods
void setMinNbRowsPerThread(const std::size_t nb) const
changes the number min of rows a thread should process in a multithreading context ...
virtual ~RecordCounter()
destructor
void setRanges(const std::vector< std::pair< std::size_t, std::size_t >, XALLOC< std::pair< std::size_t, std::size_t > > > &new_ranges)
sets new ranges to perform the countings
RecordCounter< ALLOC > & operator=(const RecordCounter< ALLOC > &from)
copy operator
RecordCounter(const DBRowGeneratorParser< ALLOC > &parser, const std::vector< std::pair< std::size_t, std::size_t >, ALLOC< std::pair< std::size_t, std::size_t > > > &ranges, const Bijection< NodeId, std::size_t, ALLOC< std::size_t > > &nodeId2columns=Bijection< NodeId, std::size_t, ALLOC< std::size_t > >(), const allocator_type &alloc=allocator_type())
default constructor
Size NodeId
Type for node ids.
#define GUM_ERROR(type, msg)
void setBayesNet(const BayesNet< GUM_SCALAR > &new_bn)
assign a new Bayes net to all the counter's generators depending on a BN
const DatabaseTable< ALLOC > & database() const
returns the database on which we perform the counts