Skip to content

parallel aggregations #2223

@alexWhitworth

Description

@alexWhitworth

In response to the prior discussion on stackoverflow, I decided to test the impact of parallel processing on aggregation--which occurs frequently in my work

I've done some tests per @mattdowle 's prior mantra of Rprof, Rprof, Rprof.

What I find is that the decision to parallelize is context dependent; but is likely significant. Depending on the test operations (eg foo below, which can be customized) and the number of cores utilized (I try both 8 and 24), I get different results.

Below results:

  1. using 8 cores, I see a 21% improvement in this example for parallelization
  2. using 24 cores, I see 14% improvement.

I also look at some real-world (non shareable) data / operations which shows a larger (33% or 25%, two different tests) improvement paralellizing with 24 cores. Edit May 2018 A new set of real-world example cases are showing closer to 85% improvements from parallel operations with 1000 groups.

R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods
[8] base

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2          data.table_1.10.4

R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5          data.table_1.10.4     

Example below:

library(data.table)
library(stringi)
library(microbenchmark)

set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)

my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)

foo <- function(dt) {
  dt2 <- dt ## needed for .SD lock
  nr <- nrow(dt2)
  
  idx <- sample.int(nr, 1, replace=FALSE)
  
  dt2[idx,][, `:=` (
    new_var1= V1 / V2,
    new_var2= V4 * V3 / V10,
    new_var3= sum(V12),
    new_var4= ifelse(V10 > 0, V11 / V13, 1),
    new_var5= ifelse(V9 < 0, V8 / V18, 1)
  )]
  
  
  return(dt2[idx,])
}

split_df <- function(d, var) {
  base::split(d, get(var, as.environment(d)))
}

foo2 <- function(dt) {
  dt2 <- split_df(dt, "grps")
  
  require(parallel)
  cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
  clusterExport(cl, varlist= "foo")
  clusterExport(cl, varlist= "dt2", envir = environment())
  clusterEvalQ(cl, library("data.table"))
  
  dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)
  
  parallel::stopCluster(cl)
  return(rbindlist(dt2))
}

print(parallel::detectCores()) # 8

microbenchmark(
  serial= dt[,foo(.SD), by= "grps"],
  parallel= foo2(dt),
  times= 10L
)

Unit: seconds
     expr      min       lq     mean   median       uq      max neval cld
   serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387    10   b
 parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257    10  a 

print(parallel::detectCores()) # 24

Unit: seconds
     expr       min        lq     mean   median       uq      max neval cld
   serial  9.014247  9.804112 12.17843 13.17508 13.56914 14.13133    10   a
 parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353    10   a

Profiling:

We can use this answer to provide a more direct response to @mattdowle 's original comment to profiling.

As a result, we do see that the majority of compute time is handled by base and not data.table. data.table operations themselves are, as expected, exceptionally fast. While some might argue that this is evidence that there is no need for parallelism within data.table, I posit that this workflow/operation-set is not atypical. That is, it is my strong suspicion that the majority of large data.table aggregation involve a substantial amount of non-data.table code; and that this is correlated with interactive use vs development / production use. I therefore conclude that parallelism would be valuable within data.table for large aggregations.

library(profr)

prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
                       simplify = FALSE)

pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
  fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
  pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}

sort(sapply(fun_timing, sum)) #  no large outliers

fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
  ret <- data.table(fun= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
  ret <- data.table(pkg= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

fun_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "fun"][
  order(avg_time, decreasing = TRUE),][1:10,]

pkg_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "pkg"][
  order(avg_time, decreasing = TRUE),]

Results:

                      fun total_time avg_time avg_pct
 1:               base::[    670.362  6.70362  0.2694
 2:      NA::[.data.table    667.350  6.67350  0.2682
 3:       .GlobalEnv::foo    335.784  3.35784  0.1349
 4:              base::[[    163.044  1.63044  0.0655
 5:   base::[[.data.frame    133.790  1.33790  0.0537
 6:            base::%in%    120.512  1.20512  0.0484
 7:        base::sys.call     86.846  0.86846  0.0348
 8: NA::replace_dot_alias     27.824  0.27824  0.0112
 9:           base::which     23.536  0.23536  0.0095
10:          base::sapply     22.080  0.22080  0.0089

          pkg total_time avg_time avg_pct
1:       base   1397.770 13.97770  0.7938
2: .GlobalEnv    335.784  3.35784  0.1908
3: data.table     27.262  0.27262  0.0155

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions