//3
//2
Find the most frequent words in a text
# Vector of words from all six books
words <- janeausten_words()
# Most frequent "a"-word that is at least 5 chars long
max_frequency(letter = "a", words = words, min_length = 5)
///////////////////////////
# Vector of words from all six books
words <- janeausten_words()
# Most frequent "a"-word that is at least 5 chars long
max_frequency(letter = "a", words = words, min_length = 5)
# Partitioning
result <- lapply(letters, max_frequency,
words = words, min_length = 5) %>% unlist()
//////////////////////////
# Vector of words from all six books
words <- janeausten_words()
# Most frequent "a"-word that is at least 5 chars long
max_frequency(letter = "a", words = words, min_length = 5)
# Partitioning
result <- lapply(letters, max_frequency,
words = words, min_length = 5) %>% unlist()
# Barplot of result
barplot(result, las = 2)
A simple embarrassingly parallel application
# Complete the function definition
mean_of_rnorm <- function(n) {
# Generate normally distributed random numbers
random_numbers <- rnorm(n)
# Calculate the mean of the random numbers
mean(random_numbers)
}
# Try it out
mean_of_rnorm(100)
//////////////////////////
# From previous step
mean_of_rnorm <- function(n) {
random_numbers <- rnorm(n)
mean(random_numbers)
}
# Create a vector to store the results
result <- rep(NA, n_replicates)
# Set the random seed to 123
[Link](123)
# Set up a for loop with iter from 1 to n_replicates
for(iter in seq_len(n_replicates)) {
# Call mean_of_rnorm with n_numbers_per_replicate
result[iter] <- mean_of_rnorm(n_numbers_per_replicate)
}
# View the result
hist(result)
//////////////////////////
# From previous step
mean_of_rnorm <- function(n) {
random_numbers <- rnorm(n)
mean(random_numbers)
}
# Repeat n_numbers_per_replicate, n_replicates times
n <- rep(n_numbers_per_replicate, n_replicates)
# Call mean_of_rnorm() repeatedly using sapply()
result <- sapply(
# The vectorized argument to pass
n,
# The function to call
mean_of_rnorm
)
# View the results
hist(result)
//2
Probabilistic projection of migration
# Function definition of ar1_multiple_blocks_of_trajectories()
ar1_multiple_blocks_of_trajectories <- function(ids, ...) {
# Call ar1_block_of_trajectories() for each ids
trajectories_by_block <- lapply(ids, ar1_block_of_trajectories, ...)
# rbind results
[Link](rbind, trajectories_by_block)
}
/////////////////////////
# From previous step
ar1_multiple_blocks_of_trajectories <- function(ids, ...) {
trajectories_by_block <- lapply(ids, ar1_block_of_trajectories, ...)
[Link](rbind, trajectories_by_block)
}
# Create a sequence from 1 to number of blocks
traj_ids <- seq_len(nrow(ar1est))
//////////////////////////
# From previous step
ar1_multiple_blocks_of_trajectories <- function(ids, ...) {
trajectories_by_block <- lapply(ids, ar1_block_of_trajectories, ...)
[Link](rbind, trajectories_by_block)
}
# Create a sequence from 1 to number of blocks
traj_ids <- seq_len(nrow(ar1est))
# Generate trajectories for all rows of the estimation dataset
trajs <- ar1_multiple_blocks_of_trajectories(
ids = traj_ids, rate0 = 0.015,
block_size = 10, traj_len = 15
)
/////////////////////////////
# From previous step
ar1_multiple_blocks_of_trajectories <- function(ids, ...) {
trajectories_by_block <- lapply(ids, ar1_block_of_trajectories, ...)
[Link](rbind, trajectories_by_block)
}
# Create a sequence from 1 to number of blocks
traj_ids <- seq_len(nrow(ar1est))
# Generate trajectories for all rows of the estimation dataset
trajs <- ar1_multiple_blocks_of_trajectories(
ids = traj_ids, rate0 = 0.015,
block_size = 10, traj_len = 15
)
# Show results
show_migration(trajs)
Passing arguments via clusterApply()
# Load parallel
library(parallel)
# How many physical cores are available?
ncores <- detectCores(logical = FALSE)
# How many random numbers to generate
n <- ncores:1
/////////////////////
# From previous step
library(parallel)
ncores <- detectCores(logical = FALSE)
n <- ncores:1
# Use lapply to call rnorm for each n,
# setting mean to 10 and sd to 2
lapply(n, rnorm, mean = 10, sd = 2)
/////////////////////
# From previous step
library(parallel)
ncores <- detectCores(logical = FALSE)
n <- ncores:1
# Create a cluster
cl <- makeCluster(ncores)
# Use clusterApply to call rnorm for each n in parallel,
# again setting mean to 10 and sd to 2
clusterApply(cl, n, rnorm, mean = 10, sd = 2)
# Stop the cluster
stopCluster(cl)
Sum in parallel
# Evaluate partial sums in parallel
part_sums <- clusterApply(cl, x = c(1, 51),
fun = function(x) sum(x:(x + 49)))
# Total sum
total <- sum(unlist(part_sums))
# Check for correctness
total == sum(1:100)
More tasks than workers
# Create a cluster and set parameters
cl <- makeCluster(2)
n_replicates <- 50
n_numbers_per_replicate <- 10000
# Parallel evaluation on n_numbers_per_replicate, n_replicates times
means <- clusterApply(cl,
x = rep(n_numbers_per_replicate, n_replicates),
fun = mean_of_rnorm)
# View results as histogram
hist(unlist(means))
Exploring the cluster object
# Load the parallel package
library(parallel)
# Make a cluster with 4 nodes
cl <- makeCluster(4)
# Investigate the structure of cl
str(cl)
///////////////////////
# Load the parallel package
library(parallel)
# Make a cluster with 4 nodes
cl <- makeCluster(4)
# Investigate the structure of cl
str(cl)
# What is the process ID of the workers?
clusterCall(cl, [Link])
# Stop the cluster
stopCluster(cl)
Socket vs. Fork
# A global variable and is defined
a_global_var <- "before"
# Create a socket cluster with 2 nodes
cl_sock <- makeCluster(2, type = "PSOCK")
# Evaluate the print function on each node
clusterCall(cl_sock, print_global_var)
# Stop the cluster
stopCluster(cl_sock)
////////////////////
# A global variable and is defined
a_global_var <- "before"
# Create a fork cluster with 2 nodes
cl_fork <- makeCluster(2, type = "FORK")
# Evaluate the print function on each node
clusterCall(cl_fork, print_global_var)
# Stop the cluster
stopCluster(cl_fork)
////////////////////////# A global variable and is defined
a_global_var <- "before"
# Create a fork cluster with 2 nodes
cl_fork <- makeCluster(2, type = "FORK")
# Change the global var to "after"
a_global_var <- "after"
# Evaluate the print fun on each node
clusterCall(cl_fork, print_global_var)
# Stop the cluster
stopCluster(cl_fork)
//////////////////////////
//2
Benchmarking setup
# Wrap this code into a function
mean_of_rnorm_sequentially <- function(n_numbers_per_replicate, n_replicates)
{
n <- rep(n_numbers_per_replicate, n_replicates)
lapply(n, mean_of_rnorm)
}
# Call it to try it
mean_of_rnorm_sequentially(1000, 5)
Task size matters
# Set numbers per replicate to 5 million
n_numbers_per_replicate <- 5e6
# Set number of replicates to 4
n_replicates <- 4
# Run a microbenchmark
microbenchmark(
# Call mean_of_rnorm_sequentially()
mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates),
# Call mean_of_rnorm_in_parallel()
mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates),
times = 1,
unit = "s"
)
///////////////////////
# Change the numbers per replicate to 100
n_numbers_per_replicate <- 100
# Change number of replicates to 100
n_replicates <- 100
# Rerun the microbenchmark
microbenchmark(
mean_of_rnorm_sequentially(n_numbers_per_replicate, n_replicates),
mean_of_rnorm_in_parallel(n_numbers_per_replicate, n_replicates),
times = 1,
unit = "s"
)
Loading package on nodes
# Pre-defined myrdnorm
myrdnorm <- function(n, mean = 0, sd = 1)
rdnorm(n, mean = mean, sd = sd)
# Parameters
n_numbers_per_replicate <- 1000
n_replicates <- 20
# Repeat n_numbers_per_replicate, n_replicates times
n <- rep(n_numbers_per_replicate, n_replicates)
# Load extraDistr on master
library(extraDistr)
# Run myrdnorm in parallel. This should fail!
res <- clusterApply(cl, n, myrdnorm)
////////////////////////////////
# From previous step
myrdnorm <- function(n, mean = 0, sd = 1)
rdnorm(n, mean = mean, sd = sd)
n_numbers_per_replicate <- 1000
n_replicates <- 20
n <- rep(n_numbers_per_replicate, n_replicates)
# Load extraDistr on master
library(extraDistr)
# Load extraDistr on all workers
clusterEvalQ(cl, library(extraDistr))
# Run myrdnorm in parallel. It should work now!
res <- clusterApply(cl, n, myrdnorm)
# Plot the result
plot(table(unlist(res)))
Setting global variables
# rdnorm(), but using global variables
myrdnorm <- function(n) {
rdnorm(n, mean = mean, sd = sd)
}
# Set mean to 10, globally
mean <- 10
# Set sd to 5, globally
sd <- 5
# Generate 1000 numbers with myrdnorm()
myrdnorm(1000)
///////////////////
# From previous step
myrdnorm <- function(n) {
rdnorm(n, mean = mean, sd = sd)
}
# Set number of numbers to generate
n <- rep(1000, 20)
# Run an expression on each worker
clusterEvalQ(
cl, {
# Load extraDistr
library(extraDistr)
# Set mean to 10
mean <- 10
# Set sd to 5
sd <- 5
})
# Run myrdnorm in parallel
res <- clusterApply(cl, n, myrdnorm)
# Plot the results
plot(table(unlist(res)))
Exporting global objects
# Set global objects on master: mean to 20, sd to 10
mean <- 20
sd <- 10
# Load extraDistr on workers
clusterEvalQ(cl, library(extraDistr))
# Export global objects to workers
clusterExport(cl, c("mean", "sd"))
# Run myrdnorm in parallel
res <- clusterApply(cl, n, myrdnorm)
# Plot the results
plot(table(unlist(res)))
Passing data as arguments
# Select words beginning with "v", at least 10 letters long
words_v10 <- select_words(words, letter = "v", min_length = 10)
# Get the unique words
unique(words_v10)
////////////////
# Generate 2 random groups
groups <- sample(x = 2, size = length(words), replace = TRUE)
# See those groups
head(groups, 20)
# Split words into groups
split_words <- split(words, groups)
////////////////
# From previous step
groups <- sample(2, length(words), replace = TRUE)
split_words <- split(words, groups)
# Apply select_words() to each element of split_words in parallel
res <- clusterApply(cl, split_words, select_words, letter = "v", min_length =
10)
# Flatten the result
words_v10 <- unlist(res)
# Get the unique words
unique(words_v10)
Chunking migration application on worker's side
# Export data and functions
clusterExport(cl, c("ar1est", "ar1_one_trajectory",
"ar1_block_of_trajectories"))
////////////////////
# Export data and functions
clusterExport(cl, c("ar1est", "ar1_one_trajectory",
"ar1_block_of_trajectories"))
# Process ar1_multiple_blocks_of_trajectories in parallel
res <- clusterApply(cl,
1:nrow(ar1est),
fun = ar1_multiple_blocks_of_trajectories)
////////////////////////////
# Export data and functions
clusterExport(cl, c("ar1est", "ar1_one_trajectory",
"ar1_block_of_trajectories"))
# Process ar1_multiple_blocks_of_trajectories in parallel
res <- clusterApply(cl,
1:nrow(ar1est),
fun = ar1_multiple_blocks_of_trajectories)
# Combine results into a matrix and show results
trajs <- [Link](rbind, res)
show_migration(trajs)
Alternative chunking
# Split task into 5 chunks
ind <- splitIndices(nrow(ar1est), 5)
# Process ar1_multiple_blocks_of_trajectories in parallel
res <- clusterApply(cl, ind, ar1_multiple_blocks_of_trajectories)
# Compare the structure of the results
str(res)
str(res_prev)
//2
Word frequency with foreach
# Load the package
library(foreach)
# foreach() %do% construct
result <- foreach(let = letters, .combine = c) %do%
max_frequency(let, words = words, min_length = 5)
# Plot results
barplot(result, las = 2)
Multiple iterators in word frequency
# foreach()%do% construct with 2 iterators
result <- foreach(let = letters, n = c(rep(2, 13), rep(6,13)), .combine = c)
%do%
max_frequency(let, words = words, min_length = n)
# Plot results
barplot(result, las = 2)
Using doParallel
# Register doParallel with 3 cores
registerDoParallel(cores = 3)
# foreach()%dopar% loop
res <- foreach(r = rep(1000, 100), .combine = rbind,
.packages = "extraDistr") %dopar% myrdnorm(r)
# Dimensions of res
dim_res <- dim(res)
Word frequency with doParallel
# Function for doParallel foreach
freq_doPar <- function(cores, min_length = 5) {
# Register a cluster of size cores
registerDoParallel(cores = cores)
# foreach loop
foreach(let = chars, .combine = c,
.export = c("max_frequency", "select_words", "words"),
.packages = c("janeaustenr", "stringr")) %dopar%
max_frequency(let, words = words, min_length = min_length)
}
# Run on 2 cores
freq_doPar(2)
Word frequency with doFuture and benchmarking
# Function for doFuture foreach
freq_doFut <- function(cores, min_length = 5) {
# Register and set plan
registerDoFuture()
plan(cluster, workers = cores)
# foreach loop
foreach(let = chars, .combine = c) %dopar%
max_frequency(let, words = words, min_length = min_length)
}
///////////////////
# Benchmark
microbenchmark(freq_seq(min_length),
freq_doPar(cores, min_length),
freq_doFut(cores, min_length),
times = 1)
Word frequency with [Link]
# Main function
freq_fapply <- function(words, chars = letters, min_length = 5) {
unlist(
future_lapply(chars, max_frequency, words = words,
min_length = min_length)
)
}
# Extract words
words <- extract_words_from_text(obama_speech)
# Call the main function
res <- freq_fapply(words)
# Plot results
barplot(res, las = 2)
Planning future
# multicore function
fapply_mc <- function(cores = 2, ...) {
# future plan
plan(multicore, workers = cores)
freq_fapply(words, chars, ...)
}
# cluster function
fapply_cl <- function(cores = NULL, ...) {
# default value for cores
if([Link](cores))
cores <- rep(c("oisin", "oscar"), each = 16)
# future plan
plan(cluster, workers = cores)
freq_fapply(words, chars, ...)
}
Benchmark future
# Microbenchmark
microbenchmark(fapply_seq = fapply_seq(),
fapply_mc_2 = fapply_mc(cores = 2),
fapply_mc_10 = fapply_mc(cores = 10),
fapply_cl = fapply_cl(cores = 2),
times = 1)
//////////////////////
//4
Load balancing
# Benchmark clusterApply and clusterApplyLB
microbenchmark(
clusterApply(cl, tasktime, [Link]),
clusterApplyLB(cl, tasktime, [Link]),
times = 1
)
# Plot cluster usage
plot_cluster_apply(cl, tasktime, [Link])
plot_cluster_applyLB(cl, tasktime, [Link])
Scheduling
# Plot cluster usage for parSapply
plot_parSapply(cl, tasktime, [Link])
# Microbenchmark
microbenchmark(
clusterApplyLB(cl, bias_tasktime, [Link]),
parSapply(cl, bias_tasktime, [Link]),
times = 1
)
# Plot cluster usage for parSapply and clusterApplyLB
plot_parSapply(cl, bias_tasktime, [Link])
plot_cluster_applyLB(cl, bias_tasktime, [Link])
//4
SOCK vs. FORK
# Register the SOCK cluster
registerDoParallel([Link])
replicate(
# Use 2 replicates
n = 2,
expr = {
# Set the seed to 100
[Link](100)
# Run two iterations in parallel, bound by rows
foreach(i = 1:2, .combine = rbind) %dopar% rnorm(3)
},
simplify = FALSE
)
///////////////////////
# Change this to register the FORK cluster
registerDoParallel([Link])
# Run this again and look at the output!
replicate(
n = 2,
expr = {
[Link](100)
foreach(i = 1:2, .combine = rbind) %dopar% rnorm(3)
},
simplify = FALSE
)
//////////////////
//3
Setting an RNG
# Create a cluster
cl <- makeCluster(2)
# Check RNGkind on workers
clusterCall(cl, RNGkind)
# Set the RNG seed on workers
clusterSetRNGStream(cl, 100)
# Check RNGkind on workers
clusterCall(cl, RNGkind)
Reproducible results in parallel
# The cluster, & how many numbers to generate
cl
n_vec
t(replicate(
# Use 3 replicates
n = 3,
expr = {
# Spread across cl, apply mean_of_rnorm() to n_vec
clusterApply(cl, n_vec, mean_of_rnorm)
}
))
//////////////////////
# The cluster, & how many numbers to generate
cl
n_vec
t(replicate(
# Use 3 replicates
n = 3,
expr = {
# Spread across cl, apply mean_of_rnorm() to n_vec
clusterApply(cl, n_vec, mean_of_rnorm)
}
))
//////////////////////
# Make a cluster of size 2
cl2 <- makeCluster(2)
# Set the cluster's RNG stream seed to 1234
clusterSetRNGStream(cl2, iseed = 1234)
# Spread across the cluster, apply mean_of_rnorm() to n_vec
unlist(clusterApply(cl2, n_vec, mean_of_rnorm))
////////////////////////////
# Make a cluster of size 4
cl4 <- makeCluster(4)
# Set the cluster's RNG stream seed to 1234
clusterSetRNGStream(cl4, iseed = 1234)
# Spread across the cluster, apply mean_of_rnorm() to n_vec
unlist(clusterApply(cl4, n_vec, mean_of_rnorm))
//4
Reproducing migration app with foreach
# Register doParallel and doRNG
registerDoParallel(cores = 2)
registerDoRNG(seed)
/////////////////
# Register doParallel and doRNG
registerDoParallel(cores = 2)
registerDoRNG(seed)
# Call ar1_block_of_trajectories via foreach
mpar <- foreach(r = 1:5) %dopar% ar1_block_of_trajectories(r)
//////////////////
# Register doParallel and doRNG
registerDoParallel(cores = 2)
registerDoRNG(seed)
# Call ar1_block_of_trajectories via foreach
mpar <- foreach(r = 1:5) %dopar% ar1_block_of_trajectories(r)
# Register sequential backend, set seed and run foreach
registerDoSEQ()
[Link](seed)
mseq <- foreach(r = 1:5) %dorng% ar1_block_of_trajectories(r)
/////////////////////
# Register doParallel and doRNG
registerDoParallel(cores = 2)
registerDoRNG(seed)
# Call ar1_block_of_trajectories via foreach
mpar <- foreach(r = 1:5) %dopar% ar1_block_of_trajectories(r)
# Register sequential backend, set seed and run foreach
registerDoSEQ()
[Link](seed)
mseq <- foreach(r = 1:5) %dorng% ar1_block_of_trajectories(r)
# Check if results identical
identical(mpar, mseq)
Reproducing migration app with [Link]
# Set multiprocess plan
plan(multiprocess, workers = 2)
//////////////
# Set multiprocess plan
plan(multiprocess, workers = 2)
# Call ar1_block_of_trajectories via future_lapply
mfpar <- future_lapply(1:5, ar1_block_of_trajectories, [Link] = seed)
///////////////
# Set multiprocess plan
plan(multiprocess, workers = 2)
# Call ar1_block_of_trajectories via future_lapply
mfpar <- future_lapply(1:5, ar1_block_of_trajectories, [Link] = seed)
# Set sequential plan and repeat future_lapply()
plan(sequential)
mfseq <- future_lapply(1:5, ar1_block_of_trajectories, [Link] = seed)
//////////////////
# Set multiprocess plan
plan(multiprocess, workers = 2)
# Call ar1_block_of_trajectories via future_lapply
mfpar <- future_lapply(1:5, ar1_block_of_trajectories, [Link] = seed)
# Set sequential plan and repeat future_lapply()
plan(sequential)
mfseq <- future_lapply(1:5, ar1_block_of_trajectories, [Link] = seed)
# Check if results are identical
identical(mfpar, mfseq)