Skip to content

Commit 879209a

Browse files
committed
Add variable limit batching to insertCommits, getCommitIDs, insertBranchCommits
insertChanges and insertSnapshots already batched by MaxSQLVariables but insertCommits (8 cols * 500 = 4000 vars), getCommitIDs, and insertBranchCommits (3 cols * 500 = 1500 vars) did not. The modernc.org sqlite driver has a limit of 32766 so this wasn't hitting failures in practice, but now all batch insert methods consistently respect the limit.
1 parent b676ce9 commit 879209a

File tree

1 file changed

+85
-44
lines changed

1 file changed

+85
-44
lines changed

internal/database/batch_writer.go

Lines changed: 85 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -204,79 +204,120 @@ func (w *BatchWriter) insertCommits(tx *sql.Tx, now time.Time) error {
204204
return nil
205205
}
206206

207-
// Build multi-value INSERT
208-
var sb strings.Builder
209-
sb.WriteString("INSERT INTO commits (sha, message, author_name, author_email, committed_at, has_dependency_changes, created_at, updated_at) VALUES ")
207+
const columnsPerRow = 8
208+
maxRowsPerBatch := MaxSQLVariables / columnsPerRow
210209

211-
args := make([]any, 0, len(w.pendingCommits)*8)
212-
for i, pc := range w.pendingCommits {
213-
if i > 0 {
214-
sb.WriteString(",")
210+
for start := 0; start < len(w.pendingCommits); start += maxRowsPerBatch {
211+
end := start + maxRowsPerBatch
212+
if end > len(w.pendingCommits) {
213+
end = len(w.pendingCommits)
214+
}
215+
batch := w.pendingCommits[start:end]
216+
217+
var sb strings.Builder
218+
sb.WriteString("INSERT INTO commits (sha, message, author_name, author_email, committed_at, has_dependency_changes, created_at, updated_at) VALUES ")
219+
220+
args := make([]any, 0, len(batch)*columnsPerRow)
221+
for i, pc := range batch {
222+
if i > 0 {
223+
sb.WriteString(",")
224+
}
225+
sb.WriteString("(?,?,?,?,?,?,?,?)")
226+
227+
hasChanges := 0
228+
if pc.hasChanges {
229+
hasChanges = 1
230+
}
231+
args = append(args, pc.info.SHA, pc.info.Message, pc.info.AuthorName, pc.info.AuthorEmail, pc.info.CommittedAt, hasChanges, now, now)
215232
}
216-
sb.WriteString("(?,?,?,?,?,?,?,?)")
217233

218-
hasChanges := 0
219-
if pc.hasChanges {
220-
hasChanges = 1
234+
if _, err := tx.Exec(sb.String(), args...); err != nil {
235+
return err
221236
}
222-
args = append(args, pc.info.SHA, pc.info.Message, pc.info.AuthorName, pc.info.AuthorEmail, pc.info.CommittedAt, hasChanges, now, now)
223237
}
224238

225-
_, err := tx.Exec(sb.String(), args...)
226-
return err
239+
return nil
227240
}
228241

229242
func (w *BatchWriter) getCommitIDs(tx *sql.Tx) (map[string]int64, error) {
230243
if len(w.pendingCommits) == 0 {
231244
return make(map[string]int64), nil
232245
}
233246

234-
// Build IN clause
235-
shas := make([]any, len(w.pendingCommits))
236-
placeholders := make([]string, len(w.pendingCommits))
237-
for i, pc := range w.pendingCommits {
238-
shas[i] = pc.info.SHA
239-
placeholders[i] = "?"
240-
}
247+
result := make(map[string]int64)
241248

242-
query := "SELECT sha, id FROM commits WHERE sha IN (" + strings.Join(placeholders, ",") + ")"
243-
rows, err := tx.Query(query, shas...)
244-
if err != nil {
245-
return nil, err
246-
}
247-
defer func() { _ = rows.Close() }()
249+
for start := 0; start < len(w.pendingCommits); start += MaxSQLVariables {
250+
end := start + MaxSQLVariables
251+
if end > len(w.pendingCommits) {
252+
end = len(w.pendingCommits)
253+
}
254+
batch := w.pendingCommits[start:end]
248255

249-
result := make(map[string]int64)
250-
for rows.Next() {
251-
var sha string
252-
var id int64
253-
if err := rows.Scan(&sha, &id); err != nil {
256+
shas := make([]any, len(batch))
257+
placeholders := make([]string, len(batch))
258+
for i, pc := range batch {
259+
shas[i] = pc.info.SHA
260+
placeholders[i] = "?"
261+
}
262+
263+
query := "SELECT sha, id FROM commits WHERE sha IN (" + strings.Join(placeholders, ",") + ")"
264+
rows, err := tx.Query(query, shas...)
265+
if err != nil {
266+
return nil, err
267+
}
268+
269+
for rows.Next() {
270+
var sha string
271+
var id int64
272+
if err := rows.Scan(&sha, &id); err != nil {
273+
_ = rows.Close()
274+
return nil, err
275+
}
276+
result[sha] = id
277+
}
278+
if err := rows.Err(); err != nil {
279+
_ = rows.Close()
254280
return nil, err
255281
}
256-
result[sha] = id
282+
_ = rows.Close()
257283
}
258-
return result, rows.Err()
284+
285+
return result, nil
259286
}
260287

261288
func (w *BatchWriter) insertBranchCommits(tx *sql.Tx, commitIDs map[string]int64) error {
262289
if len(w.pendingCommits) == 0 {
263290
return nil
264291
}
265292

266-
var sb strings.Builder
267-
sb.WriteString("INSERT INTO branch_commits (branch_id, commit_id, position) VALUES ")
293+
const columnsPerRow = 3
294+
maxRowsPerBatch := MaxSQLVariables / columnsPerRow
268295

269-
args := make([]any, 0, len(w.pendingCommits)*3)
270-
for i, pc := range w.pendingCommits {
271-
if i > 0 {
272-
sb.WriteString(",")
296+
for start := 0; start < len(w.pendingCommits); start += maxRowsPerBatch {
297+
end := start + maxRowsPerBatch
298+
if end > len(w.pendingCommits) {
299+
end = len(w.pendingCommits)
300+
}
301+
batch := w.pendingCommits[start:end]
302+
303+
var sb strings.Builder
304+
sb.WriteString("INSERT INTO branch_commits (branch_id, commit_id, position) VALUES ")
305+
306+
args := make([]any, 0, len(batch)*columnsPerRow)
307+
for i, pc := range batch {
308+
if i > 0 {
309+
sb.WriteString(",")
310+
}
311+
sb.WriteString("(?,?,?)")
312+
args = append(args, w.branchID, commitIDs[pc.info.SHA], pc.position)
313+
}
314+
315+
if _, err := tx.Exec(sb.String(), args...); err != nil {
316+
return err
273317
}
274-
sb.WriteString("(?,?,?)")
275-
args = append(args, w.branchID, commitIDs[pc.info.SHA], pc.position)
276318
}
277319

278-
_, err := tx.Exec(sb.String(), args...)
279-
return err
320+
return nil
280321
}
281322

282323
func (w *BatchWriter) ensureManifests(tx *sql.Tx, now time.Time) error {

0 commit comments

Comments
 (0)