-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathquantile.R
More file actions
201 lines (194 loc) · 7 KB
/
quantile.R
File metadata and controls
201 lines (194 loc) · 7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
quantile_col <- function(db, incoming_table_name, probs, ci, qualifiers) {
q_table_name <- quote_table_name(db, incoming_table_name, qualifiers = qualifiers)
qcount <- paste0("
SELECT
COUNT(1)
FROM
", q_table_name, "
WHERE
", quote_identifier(db, ci), " IS NOT NULL")
nrows <- as.numeric(rq_get_query(db, qcount)[[1]][[1]])
# was coming back s integer64 and messing up pmax(), pmin()
if(nrows<1) {
return(rep(NA, length(probs)))
}
indexes <- round((nrows+0.5)*probs)
indexes <- pmax(1, indexes)
indexes <- pmin(nrows, indexes)
# deal with repeated indexes
# also make sure index 1 is present so we get something back
# if there is only one value
uindexes <- sort(unique(c(1, indexes, nrows)))
indexes_str <- paste(uindexes, collapse = ", ")
unpack <- match(indexes, uindexes)
q <- paste0("
SELECT
*
FROM (
SELECT
", quote_identifier(db, ci), ",
ROW_NUMBER() OVER (ORDER BY ", quote_identifier(db, ci), ") AS idx_rquery
FROM
", q_table_name, "
WHERE
", quote_identifier(db, ci), " IS NOT NULL
) rquery_qtable
WHERE
idx_rquery IN (", indexes_str, ")
ORDER BY
idx_rquery")
r <- rq_get_query(db, q)
r[[ci]][unpack]
}
# same semantics as DB fn.
quantile_col_d <- function(d, probs, ci) {
dcol <- d[[ci]][!is.na(d[[ci]])]
nrows <- length(dcol)
# was coming back s integer64 and messing up pmax(), pmin()
if(nrows<1) {
return(rep(NA, length(probs)))
}
indexes <- round((nrows+0.5)*probs)
indexes <- pmax(1, indexes)
indexes <- pmin(nrows, indexes)
# deal with repeated indexes
# also make sure index 1 is present so we get something back
# if there is only one value
uindexes <- sort(unique(c(1, indexes, nrows)))
indexes_str <- paste(uindexes, collapse = ", ")
unpack <- match(indexes, uindexes)
r <- sort(dcol)[uindexes]
r[unpack]
}
#' Compute quantiles of specified columns
#' (without interpolation, needs a database with window functions).
#'
#' @param db database connection
#' @param incoming_table_name name of table to compute quantiles of
#' @param ... force later arguments to bind by name
#' @param probs numeric, probabilities to compute quantiles of
#' @param probs_name character name for probability column
#' @param cols character, columns to compute quantiles of
#' @param qualifiers optional named ordered vector of strings carrying additional db hierarchy terms, such as schema.
#' @return data.frame of quantiles
#'
#' @seealso \code{\link{quantile_node}}, \code{\link{rsummary}}
#'
#' @export
#'
quantile_cols <- function(db, incoming_table_name,
...,
probs = seq(0, 1, 0.25),
probs_name = "quantile_probability",
cols = rq_colnames(db, incoming_table_name),
qualifiers = NULL) {
wrapr::stop_if_dot_args(substitute(list(...)), "rquery::quantile_cols")
qtable <- data.frame(probs = probs)
colnames(qtable) <- probs_name
for(ci in cols) {
qi <- quantile_col(db, incoming_table_name, probs, ci,
qualifiers = qualifiers)
qtable[[ci]] <- qi
}
qtable
}
quantile_cols_d <- function(d,
...,
probs = seq(0, 1, 0.25),
probs_name = "quantile_probability",
cols = column_names(d)) {
if(!is.data.frame(d)) {
stop("rquery::quantile_cols_d: d must be a data.frame")
}
wrapr::stop_if_dot_args(substitute(list(...)), "rquery::quantile_cols_d")
qtable <- data.frame(probs = probs)
colnames(qtable) <- probs_name
for(ci in cols) {
qi <- quantile_col_d(d, probs, ci)
qtable[[ci]] <- qi
}
qtable
}
#' Compute quantiles over non-NULL values
#' (without interpolation, needs a database with window functions).
#'
#' Please see \url{https://github.com/WinVector/rquery/blob/master/extras/Summary_Example.md} for an example.
#'
#' This is a non_sql_node, so please see \code{\link{non_sql_node}} for some of the issues for this node type.
#'
#' @param source source to select from (relop or data.frame).
#' @param cols character, compute quantiles for these columns (NULL indicates all columns).
#' @param ... force later arguments to be bound by name
#' @param probs_name character, column name to write probs in.
#' @param probs numeric quantiles to compute
#' @param tmp_name_source wrapr::mk_tmp_name_source(), temporary name generator.
#' @param temporary logical, if TRUE use temporary tables
#' @param qualifiers optional named ordered vector of strings carrying additional db hierarchy terms, such as schema.
#' @return table of quantiles
#'
#' @seealso \code{\link{quantile_cols}}, \code{\link{rsummary}}, \code{\link{non_sql_node}}
#'
#'
#' @export
#'
quantile_node <- function(source,
cols = NULL,
...,
probs_name = "quantile_probability",
probs = seq(0, 1, 0.25),
tmp_name_source = wrapr::mk_tmp_name_source("qn"),
temporary = TRUE,
qualifiers = NULL) {
wrapr::stop_if_dot_args(substitute(list(...)), "rquery::quantile_node.relop")
if(probs_name %in% cols) {
stop("rquery::quantile_node.relop probs_name must be disjoint from cols")
}
have <- column_names(source)
if(!is.null(cols)) {
check_have_cols(have, cols, "rquery::quantile_node.relop cols")
} else {
cols <- have
}
force(cols)
force(probs_name)
force(probs)
force(temporary)
incoming_table_name = tmp_name_source()
outgoing_table_name = tmp_name_source()
f_db <- function(db,
incoming_table_name,
outgoing_table_name,
nd = NULL,
...) {
qtable <- quantile_cols(db, incoming_table_name,
probs = probs,
probs_name = probs_name,
cols = cols,
qualifiers = qualifiers)
rq_copy_to(db,
table_name = outgoing_table_name,
d = qtable,
overwrite = TRUE,
temporary = temporary,
qualifiers = qualifiers)
}
f_df <- function(d, nd = NULL) {
quantile_cols_d(d,
probs = probs,
probs_name = probs_name,
cols = cols)
}
nd <- non_sql_node(source,
f_db = f_db,
f_df = f_df,
f_dt = NULL,
incoming_table_name = incoming_table_name,
incoming_qualifiers = qualifiers,
outgoing_table_name = outgoing_table_name,
outgoing_qualifiers = qualifiers,
columns_produced = c(probs_name, cols),
display_form = paste0("quantile_node(.)"),
orig_columns = FALSE,
temporary = temporary)
nd
}