From ca74947b6354d020cbae998047f2b53ed58f3466 Mon Sep 17 00:00:00 2001 From: mattdowle Date: Fri, 7 Dec 2018 01:02:10 -0800 Subject: [PATCH 01/10] interim --- src/gsumm.c | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/gsumm.c b/src/gsumm.c index 035cfe1950..0c14fddede 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -51,23 +51,32 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { grp = (int *)R_alloc(grpn, sizeof(int)); // global grp because the g* functions (inside jsub) share this common memory - maxgrpn = 0; + const int *restrict fdp = INTEGER(f); if (LENGTH(o)) { isunsorted = 1; // for gmedian - for (int g=0, *od=INTEGER(o), *fd=INTEGER(f); gmaxgrpn) maxgrpn = grpsize[g]; // recalculate (may as well since looping anyway) and check below } } else { - for (int g=0, *fd=INTEGER(f); gmaxgrpn) maxgrpn = grpsize[g]; // needed for #2046 and #2111 when maxgrpn attribute is not attached to empty o } } SEXP tt = getAttrib(o, install("maxgrpn")); - if (length(tt) && INTEGER(tt)[0]!=maxgrpn) error("Internal error: o's maxgrpn mismatches recalculated maxgrpn"); // # nocov + if (length(tt)!=1) { + // seems to happen, and finding the maxgrpn is needed, but not sure why (TODO - trace) + // old comment to be checked: 'needed for #2046 and #2111 when maxgrpn attribute is not attached to empty o' + maxgrpn = 0; + for (int g=0; gmaxgrpn) maxgrpn = grpsize[g]; + } else { + // && INTEGER(tt)[0]!=maxgrpn) error("Internal error: o's maxgrpn mismatches recalculated maxgrpn"); // # nocov + maxgrpn = INTEGER(tt)[0]; + } oo = INTEGER(o); ff = INTEGER(f); From 60e6c4044805deab43a2c90f8f44de9f13273ee8 Mon Sep 17 00:00:00 2001 From: mattdowle Date: Sat, 8 Dec 2018 01:42:01 -0800 Subject: [PATCH 02/10] interim --- src/gsumm.c | 239 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 173 insertions(+), 66 deletions(-) diff --git a/src/gsumm.c b/src/gsumm.c index 0c14fddede..f0d33cce4d 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -1,12 +1,19 @@ #include "data.table.h" //#include -static int *grp = NULL; // the group of each x item, like a factor static int ngrp = 0; // number of groups static int *grpsize = NULL; // size of each group, used by gmean (and gmedian) not gsum -static int grpn = 0; // length of underlying x == length(grp) +static int nrow = 0; // length of underlying x; same as length(ghigh) and length(glow) static int *irows; // GForce support for subsets in 'i' (TODO: joins in 'i') static int irowslen = -1; // -1 is for irows = NULL +static uint16_t *high=NULL, *low=NULL; // the group of each x item; a.k.a. which-group-am-I +static int *restrict grp; // TODO: eventually this can be made local for gforce as won't be needed globally when all functions here use gather +static size_t highSize; +static int shift, mask; +static char *gx=NULL; + +static size_t nBatch, batchSize, lastBatchSize; +static int *counts, *tmpcounts; // for gmedian static int maxgrpn = 0; @@ -43,13 +50,27 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { else error("irowsArg is neither an integer vector nor NULL"); ngrp = LENGTH(l); if (LENGTH(f) != ngrp) error("length(f)=%d != length(l)=%d", LENGTH(f), ngrp); - grpn=0; + nrow=0; grpsize = INTEGER(l); - for (int i=0; imaxgrpn) maxgrpn = grpsize[i]; // old comment to be checked: 'needed for #2046 and #2111 when maxgrpn attribute is not attached to empty o' + } + if (LENGTH(o) && LENGTH(o)!=nrow) error("o has length %d but sum(l)=%d", LENGTH(o), nrow); + { + SEXP tt = getAttrib(o, install("maxgrpn")); + if (length(tt)==1 && INTEGER(tt)[0]!=maxgrpn) error("Internal error: o's maxgrpn attribute mismatches recalculated maxgrpn"); // # nocov + } + + int nbit=0; + { int tt=ngrp-1; while (tt) { nbit++; tt>>=1; } } // i.e. floor(log2(ngrp-1))+1 + shift = nbit/2; + mask = (1<>shift) + 1; - grp = (int *)R_alloc(grpn, sizeof(int)); - // global grp because the g* functions (inside jsub) share this common memory + grp = (int *)R_alloc(nrow, sizeof(int)); // TODO: use malloc and made this local as not needed globally when all functions here use gather + // maybe better to malloc to avoid R's heap. This grp isn't global, so it doesn't need to be R_alloc const int *restrict fdp = INTEGER(f); if (LENGTH(o)) { @@ -67,16 +88,47 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { for (int j=0; jmaxgrpn) maxgrpn = grpsize[g]; - } else { - // && INTEGER(tt)[0]!=maxgrpn) error("Internal error: o's maxgrpn mismatches recalculated maxgrpn"); // # nocov - maxgrpn = INTEGER(tt)[0]; + + high = (uint16_t *)R_alloc(nrow, sizeof(uint16_t)); // maybe better to malloc to avoid R's heap, but safer to R_alloc since it's done via eval() + low = (uint16_t *)R_alloc(nrow, sizeof(uint16_t)); + // global ghigh and glow because the g* functions (inside jsub) share this common memory + + gx = (char *)R_alloc(nrow, sizeof(double)); // enough for a copy of one column (or length(irows) if supplied) + + nBatch = getDTthreads()*2; // 2 to reduce last-thread-home. TODO: experiment. The higher this is though, the bigger is counts[] + batchSize = (nrow-1)/nBatch + 1; + lastBatchSize = nrow - (nBatch-1)*batchSize; + counts = (int *)S_alloc(nBatch*highSize, sizeof(int)); // (S_ zeros) TODO: cache-line align and make highSize a multiple of 64 + tmpcounts = (int *)R_alloc(getDTthreads()*highSize, sizeof(int)); + + const int *restrict gp = grp; + #pragma omp parallel for num_threads(getDTthreads()) // schedule(dynamic,1) + for (int b=0; b> shift; + my_counts[w]++; + my_high[i] = (uint16_t)w; // reduce 4 bytes to 2 + } + for (int i=0, cum=0; i> shift; // could use my_high but may as well use my_pg since we need my_pg anyway for the lower bits next too + my_low[my_tmpcounts[w]++] = (uint16_t)(my_pg[i] & mask); + } + // counts is now cumulated within batch (with ending values) and we leave it that way + // memcpy(counts + b*256, myCounts, 256*sizeof(int)); // save cumulate for later, first bucket contains position of next. For ease later in the very last batch. } + oo = INTEGER(o); ff = INTEGER(f); @@ -88,13 +140,49 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { SET_VECTOR_ELT(ans, 0, tt); UNPROTECT(1); } - ngrp = 0; maxgrpn = 0; irowslen = -1; isunsorted = 0; + ngrp = 0; maxgrpn=0; irowslen = -1; isunsorted = 0; // Rprintf("gforce took %8.3f\n", 1.0*(clock()-start)/CLOCKS_PER_SEC); UNPROTECT(1); return(ans); } +void *gather(void *x, size_t size, bool *anyNA) +{ + if (size==4) { + const int *thisx = x; + //int *restrict thisgx = gx; + #pragma omp parallel for num_threads(getDTthreads()) + for (int b=0; b INT_MAX || s[i] < INT_MIN) { - warning("Group %d summed to more than type 'integer' can hold so the result has been coerced to 'numeric' automatically, for convenience.", i+1); - UNPROTECT(1); - ans = PROTECT(allocVector(REALSXP, ngrp)); - double *tt = REAL(ans); - for (i=0; iINT_MAX || ldsum[i] DBL_MAX) xd[i] = R_PosInf; - else if (s[i] < -DBL_MAX) xd[i] = R_NegInf; - else xd[i] = (double)s[i]; + if (ldsum[i] > DBL_MAX) xd[i] = R_PosInf; + else if (ldsum[i] < -DBL_MAX) xd[i] = R_NegInf; + else xd[i] = (double)ldsum[i]; } } break; default: - free(s); + free(ldsum); error("Type '%s' not supported by GForce sum (gsum). Either add the prefix base::sum(.) or turn off GForce optimization using options(datatable.optimize=1)", type2char(TYPEOF(x))); } - free(s); + free(ldsum); copyMostAttrib(x, ans); UNPROTECT(1); // Rprintf("this gsum took %8.3f\n", 1.0*(clock()-start)/CLOCKS_PER_SEC); @@ -207,7 +314,7 @@ SEXP gmean(SEXP x, SEXP narm) } // na.rm=TRUE. Similar to gsum, but we need to count the non-NA as well for the divisor const int n = (irowslen == -1) ? length(x) : irowslen; - if (grpn != n) error("grpn [%d] != length(x) [%d] in gsum", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in gsum", nrow, n); long double *s = calloc(ngrp, sizeof(long double)); if (!s) error("Unable to allocate %d * %d bytes for sum in gmean na.rm=TRUE", ngrp, sizeof(long double)); @@ -263,7 +370,7 @@ SEXP gmin(SEXP x, SEXP narm) int n = (irowslen == -1) ? length(x) : irowslen; //clock_t start = clock(); SEXP ans; - if (grpn != n) error("grpn [%d] != length(x) [%d] in gmin", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in gmin", nrow, n); int protecti=0; switch(TYPEOF(x)) { case LGLSXP: case INTSXP: @@ -379,7 +486,7 @@ SEXP gmax(SEXP x, SEXP narm) int n = (irowslen == -1) ? length(x) : irowslen; //clock_t start = clock(); SEXP ans; - if (grpn != n) error("grpn [%d] != length(x) [%d] in gmax", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in gmax", nrow, n); // TODO rework gmax in the same way as gmin and remove this *update char *update = (char *)R_alloc(ngrp, sizeof(char)); @@ -524,7 +631,7 @@ SEXP gmedian(SEXP x, SEXP narm) { SEXP ans, sub, klass; void *ptr; int n = (irowslen == -1) ? length(x) : irowslen; - if (grpn != n) error("grpn [%d] != length(x) [%d] in gmedian", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in gmedian", nrow, n); switch(TYPEOF(x)) { case REALSXP: klass = getAttrib(x, R_ClassSymbol); @@ -693,7 +800,7 @@ SEXP glast(SEXP x) { R_len_t i,k; int n = (irowslen == -1) ? length(x) : irowslen; SEXP ans; - if (grpn != n) error("grpn [%d] != length(x) [%d] in gtail", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in gtail", nrow, n); switch(TYPEOF(x)) { case LGLSXP: ans = PROTECT(allocVector(LGLSXP, ngrp)); @@ -755,7 +862,7 @@ SEXP gfirst(SEXP x) { R_len_t i,k; int n = (irowslen == -1) ? length(x) : irowslen; SEXP ans; - if (grpn != n) error("grpn [%d] != length(x) [%d] in ghead", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in ghead", nrow, n); switch(TYPEOF(x)) { case LGLSXP: ans = PROTECT(allocVector(LGLSXP, ngrp)); @@ -826,7 +933,7 @@ SEXP gnthvalue(SEXP x, SEXP valArg) { R_len_t i,k, val=INTEGER(valArg)[0]; int n = (irowslen == -1) ? length(x) : irowslen; SEXP ans; - if (grpn != n) error("grpn [%d] != length(x) [%d] in ghead", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in ghead", nrow, n); switch(TYPEOF(x)) { case LGLSXP: ans = PROTECT(allocVector(LGLSXP, ngrp)); @@ -895,7 +1002,7 @@ SEXP gvarsd1(SEXP x, SEXP narm, Rboolean isSD) if (inherits(x, "factor")) error("var/sd is not meaningful for factors."); long double m, s, v; R_len_t i, j, ix, thisgrpsize = 0, n = (irowslen == -1) ? length(x) : irowslen; - if (grpn != n) error("grpn [%d] != length(x) [%d] in gvar", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in gvar", nrow, n); SEXP sub, ans = PROTECT(allocVector(REALSXP, ngrp)); Rboolean ans_na; switch(TYPEOF(x)) { @@ -1037,7 +1144,7 @@ SEXP gprod(SEXP x, SEXP narm) int n = (irowslen == -1) ? length(x) : irowslen; //clock_t start = clock(); SEXP ans; - if (grpn != n) error("grpn [%d] != length(x) [%d] in gprod", grpn, n); + if (nrow != n) error("nrow [%d] != length(x) [%d] in gprod", nrow, n); long double *s = malloc(ngrp * sizeof(long double)); if (!s) error("Unable to allocate %d * %d bytes for gprod", ngrp, sizeof(long double)); for (i=0; i Date: Sat, 8 Dec 2018 15:15:01 -0800 Subject: [PATCH 03/10] interim --- src/gsumm.c | 58 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/src/gsumm.c b/src/gsumm.c index f0d33cce4d..d2218dd32c 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -68,6 +68,7 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { shift = nbit/2; mask = (1<>shift) + 1; + Rprintf("ngrp=%d nbit=%d shift=%d highSize=%d\n", ngrp, nbit, shift, highSize); grp = (int *)R_alloc(nrow, sizeof(int)); // TODO: use malloc and made this local as not needed globally when all functions here use gather // maybe better to malloc to avoid R's heap. This grp isn't global, so it doesn't need to be R_alloc @@ -194,17 +195,21 @@ SEXP gsum(SEXP x, SEXP narmArg) const int n = (irowslen == -1) ? length(x) : irowslen; //clock_t start = clock(); if (nrow != n) error("nrow [%d] != length(x) [%d] in gsum", nrow, n); - long double *ldsum = calloc(ngrp, sizeof(long double)); - if (!ldsum) error("Unable to allocate %d * %d bytes for gsum", ngrp, sizeof(long double)); bool anyNA=false; SEXP ans; switch(TYPEOF(x)) { case LGLSXP: case INTSXP: { // int *xd = INTEGER(x); const int *restrict gx = gather(INTEGER(x), sizeof(int), &anyNA); // TODO: could return anyNA too - #pragma omp parallel for num_threads(getDTthreads()) // schedule(dynamic,1) + ans = PROTECT(allocVector(INTSXP, ngrp)); + int *restrict ansp = INTEGER(ans); + memset(ansp, 0, ngrp*sizeof(int)); + //int64_t *i64sum = calloc(ngrp, sizeof(int64_t)); + //if (!i64sum) error("Unable to allocate %d * %d bytes for gsum i64", ngrp, sizeof(int64_t)); + bool overflow=false; + #pragma omp parallel for num_threads(getDTthreads()) schedule(dynamic,1) for (int h=0; h0 && b>INT_MAX-a) || (a<0 && b0 && b>INT_MAX-a) || (a<0 && bINT_MAX || ldsum[i]INT32_MAX || (i64sum[i]<=NA_INTEGER && i64sum[i]!=INT64_MIN)) stop=true; + } + if (stop) { warning("The sum of an integer column for a group was more than type 'integer' can hold so the result has been coerced to 'numeric' automatically for convenience."); ans = PROTECT(allocVector(REALSXP, ngrp)); double *restrict ansp = REAL(ans); #pragma omp parallel for num_threads(getDTthreads()) for (int i=0; i Date: Mon, 10 Dec 2018 18:51:44 -0800 Subject: [PATCH 04/10] interim --- src/gsumm.c | 109 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 90 insertions(+), 19 deletions(-) diff --git a/src/gsumm.c b/src/gsumm.c index d2218dd32c..90dfedaffc 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -32,8 +32,18 @@ static union { # define SQRTL sqrt #endif +static int nbit(int n) +{ + // returns position of biggest bit; i.e. floor(log2(n))+1 without using fpa + // not needed to be fast. Just a helper function. + int nb=0; + while (n) { nb++; n>>=1; } + return nb; +} + SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { // clock_t start = clock(); + double started = wallclock(); if (TYPEOF(env) != ENVSXP) error("env is not an environment"); // The type of jsub is pretty flexbile in R, so leave checking to eval() below. if (!isInteger(o)) error("o is not an integer vector"); @@ -63,31 +73,87 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { if (length(tt)==1 && INTEGER(tt)[0]!=maxgrpn) error("Internal error: o's maxgrpn attribute mismatches recalculated maxgrpn"); // # nocov } - int nbit=0; - { int tt=ngrp-1; while (tt) { nbit++; tt>>=1; } } // i.e. floor(log2(ngrp-1))+1 - shift = nbit/2; + int nb = nbit(ngrp-1); + //shift = nb/2; + shift = MAX(nb-8,0); mask = (1<>shift) + 1; - Rprintf("ngrp=%d nbit=%d shift=%d highSize=%d\n", ngrp, nbit, shift, highSize); grp = (int *)R_alloc(nrow, sizeof(int)); // TODO: use malloc and made this local as not needed globally when all functions here use gather // maybe better to malloc to avoid R's heap. This grp isn't global, so it doesn't need to be R_alloc - const int *restrict fdp = INTEGER(f); + + nBatch = MIN((nrow+1)/2, getDTthreads()*2); // 2 to reduce last-thread-home. TODO: experiment. The higher this is though, the bigger is counts[] + batchSize = (nrow-1)/nBatch + 1; + lastBatchSize = nrow - (nBatch-1)*batchSize; + + Rprintf("ngrp=%d nbit=%d shift=%d highSize=%d nBatch=%d batchSize=%d lastBatchSize=%d\n", ngrp, nb, shift, highSize, nBatch, batchSize, lastBatchSize); + + // initial population of g: + #pragma omp parallel for num_threads(getDTthreads()) + for (int g=0; g>shift) + 1; + Rprintf("When assigning grp[o] = g, highSize=%d nb=%d shift=%d nBatch=%d\n", highSize, nb, shift, nBatch); + int *counts = calloc(nBatch*highSize, sizeof(int)); // (S_ zeros) TODO: cache-line align and make highSize a multiple of 64. This +1 is for easier diff later + int *tmpO = malloc(nrow*sizeof(int)); + int *tmpG = malloc(nrow*sizeof(int)); + if (!counts || !tmpO || !tmpG) error("Internal error: Failed to allocate counts, tmpO or tmpG when assigning g in gforce"); + #pragma omp parallel for num_threads(getDTthreads()) // schedule(dynamic,1) + for (int b=0; b> shift; + my_counts[w]++; + } + for (int i=0, cum=0; i> shift; // could use my_high but may as well use my_pg since we need my_pg anyway for the lower bits next too + const int p = my_counts[w]++; + my_tmpO[p] = (int)(my_o[i]-1); + my_tmpG[p] = (int)(my_g[i]); + } } - } else { + Rprintf("gforce assign tmpO and tmpG took %.3f\n", wallclock()-started); started=wallclock(); #pragma omp parallel for num_threads(getDTthreads()) - for (int g=0; g Date: Mon, 10 Dec 2018 22:13:57 -0800 Subject: [PATCH 05/10] assigning g from each batch didn't work quite as well --- src/gsumm.c | 98 +++++++++++++++++++++++++++++------------------------ 1 file changed, 53 insertions(+), 45 deletions(-) diff --git a/src/gsumm.c b/src/gsumm.c index 90dfedaffc..42cffc79ca 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -105,54 +105,62 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { // const int *elem = odp + fdp[g]-1; // for (int j=0; j>shift) + 1; - Rprintf("When assigning grp[o] = g, highSize=%d nb=%d shift=%d nBatch=%d\n", highSize, nb, shift, nBatch); - int *counts = calloc(nBatch*highSize, sizeof(int)); // (S_ zeros) TODO: cache-line align and make highSize a multiple of 64. This +1 is for easier diff later - int *tmpO = malloc(nrow*sizeof(int)); - int *tmpG = malloc(nrow*sizeof(int)); - if (!counts || !tmpO || !tmpG) error("Internal error: Failed to allocate counts, tmpO or tmpG when assigning g in gforce"); - #pragma omp parallel for num_threads(getDTthreads()) // schedule(dynamic,1) - for (int b=0; b> shift; - my_counts[w]++; - } - for (int i=0, cum=0; i> shift; // could use my_high but may as well use my_pg since we need my_pg anyway for the lower bits next too - const int p = my_counts[w]++; - my_tmpO[p] = (int)(my_o[i]-1); - my_tmpG[p] = (int)(my_g[i]); - } - } - Rprintf("gforce assign tmpO and tmpG took %.3f\n", wallclock()-started); started=wallclock(); - #pragma omp parallel for num_threads(getDTthreads()) - for (int h=0; h>_shift) + 1; + int _nth = MIN(_nBatch, getDTthreads()); + + int *_counts = malloc(_nth*_highSize*sizeof(int)); // TODO: cache-line align and make highSize a multiple of 64. This +1 is for easier diff later + int *_tmpO = malloc(_nth*_batchSize*sizeof(int)); + int *_tmpG = malloc(_nth*_batchSize*sizeof(int)); + int *restrict _newG = malloc(nrow*sizeof(int)); + if (!_counts || !_tmpO || !_tmpG || !_newG) error("Internal error: Failed to allocate counts, tmpO, tmpG or newG when assigning g in gforce"); + Rprintf("When assigning grp[o]=g, _highSize=%d _nb=%d _shift=%d _nBatch=%d, _batchSize=%d _lastBatchSize=%d _nth=%d\n", + _highSize, _nb, _shift, _nBatch, _batchSize, _lastBatchSize, _nth); + + #pragma omp parallel num_threads(_nth) // TODO: could loop through g and avoid needing newG ? + { + const int me = omp_get_thread_num(); + int *restrict my_tmpO = _tmpO + me*_batchSize; + int *restrict my_tmpG = _tmpG + me*_batchSize; + int *restrict my_counts = _counts + me*_highSize; + #pragma omp for // schedule(dynamic,1) + for (int b=0; b<_nBatch; b++) { + memset(my_counts, 0, _highSize*sizeof(int)); + const int howMany = b==_nBatch-1 ? _lastBatchSize : _batchSize; + const int *my_o = op + b*_batchSize; + const int *restrict my_g = grp + b*_batchSize; + for (int i=0; i> _shift; + my_counts[w]++; + } + for (int i=0, cum=0; i<_highSize; i++) { + int tmp = my_counts[i]; + my_counts[i] = cum; + cum += tmp; + } + for (int i=0; i> _shift; // could use my_high but may as well use my_pg since we need my_pg anyway for the lower bits next too + const int p = my_counts[w]++; + my_tmpO[p] = (int)(my_o[i]-1); + my_tmpG[p] = (int)(my_g[i]); + } + for (int i=0; i Date: Mon, 10 Dec 2018 23:00:28 -0800 Subject: [PATCH 06/10] (o,g) pairs better --- src/gsumm.c | 96 +++++++++++++++++++++++------------------------------ 1 file changed, 42 insertions(+), 54 deletions(-) diff --git a/src/gsumm.c b/src/gsumm.c index 42cffc79ca..d7a313133e 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -105,63 +105,51 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { // const int *elem = odp + fdp[g]-1; // for (int j=0; j>_shift) + 1; - int _nth = MIN(_nBatch, getDTthreads()); - - int *_counts = malloc(_nth*_highSize*sizeof(int)); // TODO: cache-line align and make highSize a multiple of 64. This +1 is for easier diff later - int *_tmpO = malloc(_nth*_batchSize*sizeof(int)); - int *_tmpG = malloc(_nth*_batchSize*sizeof(int)); - int *restrict _newG = malloc(nrow*sizeof(int)); - if (!_counts || !_tmpO || !_tmpG || !_newG) error("Internal error: Failed to allocate counts, tmpO, tmpG or newG when assigning g in gforce"); - Rprintf("When assigning grp[o]=g, _highSize=%d _nb=%d _shift=%d _nBatch=%d, _batchSize=%d _lastBatchSize=%d _nth=%d\n", - _highSize, _nb, _shift, _nBatch, _batchSize, _lastBatchSize, _nth); - - #pragma omp parallel num_threads(_nth) // TODO: could loop through g and avoid needing newG ? - { - const int me = omp_get_thread_num(); - int *restrict my_tmpO = _tmpO + me*_batchSize; - int *restrict my_tmpG = _tmpG + me*_batchSize; - int *restrict my_counts = _counts + me*_highSize; - #pragma omp for // schedule(dynamic,1) - for (int b=0; b<_nBatch; b++) { - memset(my_counts, 0, _highSize*sizeof(int)); - const int howMany = b==_nBatch-1 ? _lastBatchSize : _batchSize; - const int *my_o = op + b*_batchSize; - const int *restrict my_g = grp + b*_batchSize; - for (int i=0; i> _shift; - my_counts[w]++; - } - for (int i=0, cum=0; i<_highSize; i++) { - int tmp = my_counts[i]; - my_counts[i] = cum; - cum += tmp; - } - for (int i=0; i> _shift; // could use my_high but may as well use my_pg since we need my_pg anyway for the lower bits next too - const int p = my_counts[w]++; - my_tmpO[p] = (int)(my_o[i]-1); - my_tmpG[p] = (int)(my_g[i]); - } - for (int i=0; i>shift) + 1; + Rprintf("When assigning grp[o] = g, highSize=%d nb=%d shift=%d nBatch=%d\n", highSize, nb, shift, nBatch); + int *counts = calloc(nBatch*highSize, sizeof(int)); // (S_ zeros) TODO: cache-line align and make highSize a multiple of 64. This +1 is for easier diff later + int *TMP = malloc(nrow*2*sizeof(int)); + if (!counts || !TMP ) error("Internal error: Failed to allocate counts, tmpO or tmpG when assigning g in gforce"); + #pragma omp parallel for num_threads(getDTthreads()) // schedule(dynamic,1) + for (int b=0; b> shift; + my_counts[w]++; + } + for (int i=0, cum=0; i> shift; // could use my_high but may as well use my_pg since we need my_pg anyway for the lower bits next too + int *p = my_tmp + 2*my_counts[w]++; + *p++ = my_o[i]-1; + *p = my_g[i]; + } + } + Rprintf("gforce assign TMP (o,g) pairs took %.3f\n", wallclock()-started); started=wallclock(); + #pragma omp parallel for num_threads(getDTthreads()) + for (int h=0; h Date: Mon, 10 Dec 2018 23:39:48 -0800 Subject: [PATCH 07/10] tidy --- src/gsumm.c | 45 +++++++++++++++++++++------------------------ 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/src/gsumm.c b/src/gsumm.c index d7a313133e..ff2358e112 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -42,8 +42,7 @@ static int nbit(int n) } SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { - // clock_t start = clock(); - double started = wallclock(); + // double started = wallclock(); if (TYPEOF(env) != ENVSXP) error("env is not an environment"); // The type of jsub is pretty flexbile in R, so leave checking to eval() below. if (!isInteger(o)) error("o is not an integer vector"); @@ -74,44 +73,44 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { } int nb = nbit(ngrp-1); - //shift = nb/2; - shift = MAX(nb-8,0); + shift = MAX(nb-8,0); //shift = nb/2; mask = (1<>shift) + 1; grp = (int *)R_alloc(nrow, sizeof(int)); // TODO: use malloc and made this local as not needed globally when all functions here use gather // maybe better to malloc to avoid R's heap. This grp isn't global, so it doesn't need to be R_alloc - const int *restrict fdp = INTEGER(f); + const int *restrict fp = INTEGER(f); nBatch = MIN((nrow+1)/2, getDTthreads()*2); // 2 to reduce last-thread-home. TODO: experiment. The higher this is though, the bigger is counts[] batchSize = (nrow-1)/nBatch + 1; lastBatchSize = nrow - (nBatch-1)*batchSize; - Rprintf("ngrp=%d nbit=%d shift=%d highSize=%d nBatch=%d batchSize=%d lastBatchSize=%d\n", ngrp, nb, shift, highSize, nBatch, batchSize, lastBatchSize); + //Rprintf("ngrp=%d nbit=%d shift=%d highSize=%d nBatch=%d batchSize=%d lastBatchSize=%d\n", ngrp, nb, shift, highSize, nBatch, batchSize, lastBatchSize); // initial population of g: #pragma omp parallel for num_threads(getDTthreads()) for (int g=0; g>shift) + 1; - Rprintf("When assigning grp[o] = g, highSize=%d nb=%d shift=%d nBatch=%d\n", highSize, nb, shift, nBatch); - int *counts = calloc(nBatch*highSize, sizeof(int)); // (S_ zeros) TODO: cache-line align and make highSize a multiple of 64. This +1 is for easier diff later + //Rprintf("When assigning grp[o] = g, highSize=%d nb=%d shift=%d nBatch=%d\n", highSize, nb, shift, nBatch); + int *counts = calloc(nBatch*highSize, sizeof(int)); // TODO: cache-line align and make highSize a multiple of 64 int *TMP = malloc(nrow*2*sizeof(int)); - if (!counts || !TMP ) error("Internal error: Failed to allocate counts, tmpO or tmpG when assigning g in gforce"); + if (!counts || !TMP ) error("Internal error: Failed to allocate counts or TMP when assigning g in gforce"); #pragma omp parallel for num_threads(getDTthreads()) // schedule(dynamic,1) for (int b=0; b Date: Tue, 11 Dec 2018 01:08:42 -0800 Subject: [PATCH 08/10] gsum real --- CRAN_Release.cmd | 2 + src/gsumm.c | 202 +++++++++++++++++++++++++++++++---------------- 2 files changed, 138 insertions(+), 66 deletions(-) diff --git a/CRAN_Release.cmd b/CRAN_Release.cmd index e85e9a59fd..ada6baf4c6 100644 --- a/CRAN_Release.cmd +++ b/CRAN_Release.cmd @@ -236,6 +236,8 @@ print(Sys.time()); require(data.table); print(Sys.time()); started.at<-proc.time # Investigated and ignore : # Tests 648 and 1262 (see their comments) have single precision issues under valgrind that don't occur on CRAN, even Solaris. +# Old comment from gsumm.c ... // long double usage here used to result in test 648 failing when run under valgrind + // http://valgrind.org/docs/manual/manual-core.html#manual-core.limits" # Ignore all "set address range perms" warnings : # http://stackoverflow.com/questions/13558067/what-does-this-valgrind-warning-mean-warning-set-address-range-perms # Ignore heap summaries around test 1705 and 1707/1708 due to the fork() test opening/closing, I guess. diff --git a/src/gsumm.c b/src/gsumm.c index ff2358e112..d400bac63d 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -208,12 +208,12 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { return(ans); } -void *gather(void *x, size_t size, bool *anyNA) +void *gather(SEXP x, bool *anyNA) { //double started = wallclock(); - if (size==4) { - const int *thisx = x; - //int *restrict thisgx = gx; + switch (TYPEOF(x)) { + case LGLSXP: case INTSXP: { + const int *restrict thisx = INTEGER(x); #pragma omp parallel for num_threads(getDTthreads()) for (int b=0; b0 && b>INT_MAX-a) || (a<0 && b0 && b>INT_MAX-a) || (a<0 && bINT32_MAX || (i64sum[i]<=NA_INTEGER && i64sum[i]!=INT64_MIN)) stop=true; - } - if (stop) { + if (overflow) { + UNPROTECT(1); // discard the result with overflow warning("The sum of an integer column for a group was more than type 'integer' can hold so the result has been coerced to 'numeric' automatically for convenience."); ans = PROTECT(allocVector(REALSXP, ngrp)); double *restrict ansp = REAL(ans); + memset(ansp, 0, ngrp*sizeof(double)); #pragma omp parallel for num_threads(getDTthreads()) - for (int i=0; i DBL_MAX) xd[i] = R_PosInf; - else if (ldsum[i] < -DBL_MAX) xd[i] = R_NegInf; - else xd[i] = (double)ldsum[i]; - } - free(ldsum); } break; default: error("Type '%s' not supported by GForce sum (gsum). Either add the prefix base::sum(.) or turn off GForce optimization using options(datatable.optimize=1)", type2char(TYPEOF(x))); From 313e90d8be496111356d90466327dcccd4e69f64 Mon Sep 17 00:00:00 2001 From: mattdowle Date: Tue, 11 Dec 2018 03:04:14 -0800 Subject: [PATCH 09/10] coverage --- inst/tests/tests.Rraw | 8 ++++++++ src/gsumm.c | 6 +++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 759c41c597..b9bfee365c 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -13029,6 +13029,14 @@ test(1967.75, x[ , .(v = sum(v)), by = i1:i4], x[-10L]) test(1967.76, x[1:5, sum(v), by = list(i5 = 1:5 %% 2L), verbose = TRUE], data.table(i5 = 1:0, V1 = c(0, 0)), output = 'i clause present but columns used in by not detected') +# gforce integer overflow coerce to double +DT = data.table(A=1:5, B=-3i, C=2147483647L) +test(1968.1, DT[, sum(B), by=A%%2L], error="Type 'complex' not supported by GForce sum (gsum). Either add the") +test(1968.2, storage.mode(DT$C), "integer") +test(1968.3, DT[, sum(C), by=A%%2L], data.table(A=c(1L,0L), V1=c(6442450941, 4294967294)), + warning="sum.*integer column.*more than type 'integer' can hold.*coerced to 'numeric'") + + ################################### # Add new tests above this line # ################################### diff --git a/src/gsumm.c b/src/gsumm.c index d400bac63d..0641af5d14 100644 --- a/src/gsumm.c +++ b/src/gsumm.c @@ -56,7 +56,7 @@ SEXP gforce(SEXP env, SEXP jsub, SEXP o, SEXP f, SEXP l, SEXP irowsArg) { irows = INTEGER(irowsArg); irowslen = LENGTH(irowsArg); } - else error("irowsArg is neither an integer vector nor NULL"); + else error("irowsArg is neither an integer vector nor NULL"); // # nocov ngrp = LENGTH(l); if (LENGTH(f) != ngrp) error("length(f)=%d != length(l)=%d", LENGTH(f), ngrp); nrow=0; @@ -298,7 +298,7 @@ void *gather(SEXP x, bool *anyNA) } } break; default : - error("gather implemented for INTSXP and REALSXP but not '%s'", type2char(TYPEOF(x))); + error("gather implemented for INTSXP and REALSXP but not '%s'", type2char(TYPEOF(x))); // # nocov } //Rprintf("gather took %.3fs\n", wallclock()-started); return gx; @@ -385,7 +385,7 @@ SEXP gsum(SEXP x, SEXP narmArg) if (!narm) _ans[my_low[i]]=NA_REAL; continue; } - _ans[my_low[i]] += b; // let NA_REAL propagate + _ans[my_low[i]] += elem; // let NA_REAL propagate } } } From 0a955e2328d8de710a801482dfdd785e012f3d44 Mon Sep 17 00:00:00 2001 From: mattdowle Date: Tue, 11 Dec 2018 03:30:45 -0800 Subject: [PATCH 10/10] coverage --- inst/tests/tests.Rraw | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index b9bfee365c..d23547b937 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -13035,6 +13035,13 @@ test(1968.1, DT[, sum(B), by=A%%2L], error="Type 'complex' not supported by GFor test(1968.2, storage.mode(DT$C), "integer") test(1968.3, DT[, sum(C), by=A%%2L], data.table(A=c(1L,0L), V1=c(6442450941, 4294967294)), warning="sum.*integer column.*more than type 'integer' can hold.*coerced to 'numeric'") +DT[3,C:=NA] +test(1968.4, DT[, sum(C), by=A%%2L], data.table(A=c(1L,0L), V1=c(NA, 4294967294)), warning="coerced to 'numeric'") +test(1968.5, DT[, sum(C,na.rm=TRUE), by=A%%2L], data.table(A=c(1L,0L), V1=c(4294967294, 4294967294)), warning="coerced to 'numeric'") +DT[4,C:=NA] +test(1968.6, DT[, sum(C,na.rm=TRUE), by=A%%2L], data.table(A=c(1L,0L), V1=c(4294967294, 2147483647)), warning="coerced to 'numeric'") +DT[2,C:=NA] +test(1968.7, DT[, sum(C,na.rm=TRUE), by=A%%2L], data.table(A=c(1L,0L), V1=c(4294967294, 0)), warning="coerced to 'numeric'") ###################################