Skip to content

Commit b73cc69

Browse files
authored
fix(core): avoid overwhelming DB with connections during analytics init (#34881)
## Current Behavior Every process that initializes analytics opens its own DB connection to get/create a session ID. This includes the CLI, the daemon, and **every plugin worker**. In workspaces with many plugins, dozens of plugin workers spawn simultaneously, each opening a DB connection and querying/writing session metadata. This overwhelms the SQLite database with concurrent connections and causes failures. ## How This Fixes It The root cause is that plugin workers each independently open a DB connection just to read the session ID. The fix eliminates this by having the parent process (CLI or daemon) fetch the session ID once and pass it to child processes via the `NX_ANALYTICS_SESSION_ID` environment variable. Plugin workers inherit this env var and initialize telemetry without touching the DB at all. | Process | Before | After | |---------|--------|-------| | CLI | Opens DB connection | Opens DB connection (1x) | | Daemon | Opens DB connection | Opens DB connection (1x) | | Plugin worker (×N) | Each opens DB connection | Reads env var, **no DB connection** | In a workspace with 20 plugins, this reduces DB connections from 22 (CLI + daemon + 20 workers) down to 2 (CLI + daemon only). ## Two Initialization Paths - **`initializeTelemetry(dbConnection, ...)`** — Used by CLI and daemon. Gets/creates the session ID from the DB via a transaction, stores the connection for persisting session refreshes on flush, and returns the session ID so the caller can set it as an env var for child processes. - **`initializeTelemetryWithSessionId(sessionId, ...)`** — Used by plugin workers. Takes the session ID inherited from the parent process env var. No DB connection, no DB queries. ## Session Refresh for Long-Lived Processes The daemon is long-lived and could hold a stale session ID for hours. The telemetry background thread now tracks activity and generates a new session ID after 30 minutes of inactivity (matching the existing GA4 session timeout). When a session refreshes, the background thread notifies the main thread via a channel, which persists the new session to the DB in a transaction on flush. ## Other Changes - Extracted `TelemetryOptions` struct to replace the long parameter list in `TelemetryService::new` - Moved `SESSION_TIMEOUT_SECS` to `constants.rs` so it can be shared between modules - Extracted `init_service` helper to deduplicate between the two init paths - Extracted `persist_session_to_db` helper that wraps both metadata writes in a transaction - Separated session ID retrieval (`get_or_create_session_id`) from telemetry service initialization ## Related Issue(s) Fixes database connection exhaustion when many plugin workers initialize analytics simultaneously.
1 parent dcdabdd commit b73cc69

6 files changed

Lines changed: 294 additions & 106 deletions

File tree

packages/nx/src/analytics/analytics.ts

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { nxVersion } from '../utils/versions';
44
import { IS_WASM } from '../native';
55
import type {
66
initializeTelemetry as InitializeTelemetryType,
7+
initializeTelemetryWithSessionId as InitializeTelemetryWithSessionIdType,
78
flushTelemetry as FlushTelemetryType,
89
trackEvent as TrackEventType,
910
trackPageView as TrackPageViewType,
@@ -23,6 +24,7 @@ import { getDbConnection } from '../utils/db-connection';
2324

2425
// Conditionally import telemetry functions only on non-WASM platforms
2526
let initializeTelemetry: typeof InitializeTelemetryType;
27+
let initializeTelemetryWithSessionId: typeof InitializeTelemetryWithSessionIdType;
2628
let flushTelemetry: typeof FlushTelemetryType;
2729
let trackEventNative: typeof TrackEventType;
2830
let trackPageViewNative: typeof TrackPageViewType;
@@ -32,6 +34,8 @@ let getEventDimensions: typeof GetEventDimensionsType;
3234
if (!IS_WASM) {
3335
const nativeModule = require('../native');
3436
initializeTelemetry = nativeModule.initializeTelemetry;
37+
initializeTelemetryWithSessionId =
38+
nativeModule.initializeTelemetryWithSessionId;
3539
flushTelemetry = nativeModule.flushTelemetry;
3640
trackEventNative = nativeModule.trackEvent;
3741
trackPageViewNative = nativeModule.trackPageView;
@@ -73,23 +77,33 @@ export async function startAnalytics() {
7377
? `${nodeVersion.major}.${nodeVersion.minor}.${nodeVersion.patch}`
7478
: 'unknown';
7579

80+
const commonArgs = [
81+
workspaceId,
82+
userId,
83+
nxVersion,
84+
packageManagerInfo.name,
85+
packageManagerInfo.version,
86+
nodeVersionString,
87+
os.arch(),
88+
os.platform(),
89+
os.release(),
90+
!!isCI(),
91+
isNxCloud,
92+
] as const;
93+
7694
try {
77-
const dbConnection = getDbConnection();
78-
79-
initializeTelemetry(
80-
dbConnection,
81-
workspaceId,
82-
userId,
83-
nxVersion,
84-
packageManagerInfo.name,
85-
packageManagerInfo.version,
86-
nodeVersionString,
87-
os.arch(),
88-
os.platform(),
89-
os.release(),
90-
!!isCI(),
91-
isNxCloud
92-
);
95+
const sessionId = process.env.NX_ANALYTICS_SESSION_ID;
96+
97+
if (sessionId) {
98+
// Plugin worker path — reuse session ID from parent, no DB needed
99+
initializeTelemetryWithSessionId(sessionId, ...commonArgs);
100+
} else {
101+
// CLI/daemon path — get session from DB, set env var for children
102+
const dbConnection = getDbConnection();
103+
const newSessionId = initializeTelemetry(dbConnection, ...commonArgs);
104+
process.env.NX_ANALYTICS_SESSION_ID = newSessionId;
105+
}
106+
93107
_telemetryInitialized = true;
94108

95109
// Flush analytics automatically on process exit so every code path

packages/nx/src/native/index.d.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,11 +380,20 @@ export interface HashInputs {
380380
}
381381

382382
/**
383-
* Initialize the global telemetry service.
384-
* Reads or creates a session ID from the database so that multiple CLI
385-
* invocations within 30 minutes share the same GA4 session.
383+
* Initialize telemetry using a DB connection.
384+
* Gets/creates the session ID from the DB, stores the connection
385+
* for persisting session refreshes on flush, and returns the session ID
386+
* so the caller can set it as an env var for child processes.
387+
* Used by CLI and daemon.
386388
*/
387-
export declare function initializeTelemetry(connection: ExternalObject<NxDbConnection>, workspaceId: string, userId: string, nxVersion: string, packageManagerName: string, packageManagerVersion: string | undefined | null, nodeVersion: string, osArch: string, osPlatform: string, osRelease: string, isCi: boolean, isNxCloud: boolean): void
389+
export declare function initializeTelemetry(connection: ExternalObject<NxDbConnection>, workspaceId: string, userId: string, nxVersion: string, packageManagerName: string, packageManagerVersion: string | undefined | null, nodeVersion: string, osArch: string, osPlatform: string, osRelease: string, isCi: boolean, isNxCloud: boolean): string
390+
391+
/**
392+
* Initialize telemetry with a pre-fetched session ID.
393+
* No DB connection — used by plugin workers that inherit the
394+
* session ID from their parent process via env var.
395+
*/
396+
export declare function initializeTelemetryWithSessionId(sessionId: string, workspaceId: string, userId: string, nxVersion: string, packageManagerName: string, packageManagerVersion: string | undefined | null, nodeVersion: string, osArch: string, osPlatform: string, osRelease: string, isCi: boolean, isNxCloud: boolean): void
388397

389398
export interface InputsInput {
390399
input: string

packages/nx/src/native/native-bindings.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ module.exports.GroupType = nativeBinding.GroupType
613613
module.exports.hashArray = nativeBinding.hashArray
614614
module.exports.hashFile = nativeBinding.hashFile
615615
module.exports.initializeTelemetry = nativeBinding.initializeTelemetry
616+
module.exports.initializeTelemetryWithSessionId = nativeBinding.initializeTelemetryWithSessionId
616617
module.exports.installNxConsole = nativeBinding.installNxConsole
617618
module.exports.installNxConsoleForEditor = nativeBinding.installNxConsoleForEditor
618619
module.exports.IS_WASM = nativeBinding.IS_WASM

packages/nx/src/native/telemetry/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
pub const TRACKING_ID_PROD: &str = "G-BGPKPJK4PY";
22
pub const GA_ENDPOINT: &str = "https://www.google-analytics.com/g/collect";
33
pub const BATCH_INTERVAL_MS: u64 = 50;
4+
/// Session timeout in seconds (30 minutes). If no events are received
5+
/// within this window, a new session ID is generated.
6+
pub const SESSION_TIMEOUT_SECS: u64 = 30 * 60;
47

58
// Google Analytics Measurement Protocol limits
69
pub const MAX_EVENT_NAME_LENGTH: usize = 40;

packages/nx/src/native/telemetry/mod.rs

Lines changed: 96 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@ use std::collections::HashMap;
2626
use std::sync::{Arc, Mutex};
2727

2828
use crate::native::db::connection::NxDbConnection;
29+
use constants::SESSION_TIMEOUT_SECS;
2930
use constants::event_dimension;
30-
use service::TelemetryService;
31-
32-
/// Session timeout in seconds (30 minutes)
33-
const SESSION_TIMEOUT_SECS: i64 = 30 * 60;
31+
use service::{TelemetryOptions, TelemetryService, persist_session_to_db};
3432

3533
// Global telemetry instance for Rust code to use directly
3634
static GLOBAL_TELEMETRY: OnceCell<Arc<TelemetryService>> = OnceCell::new();
@@ -39,9 +37,45 @@ static GLOBAL_TELEMETRY: OnceCell<Arc<TelemetryService>> = OnceCell::new();
3937
// Public NAPI API
4038
// =============================================================================
4139

42-
/// Initialize the global telemetry service.
43-
/// Reads or creates a session ID from the database so that multiple CLI
44-
/// invocations within 30 minutes share the same GA4 session.
40+
/// Track an event using the global telemetry instance
41+
#[napi]
42+
pub fn track_event(event_name: String, parameters: Option<HashMap<String, String>>) -> Result<()> {
43+
tracing::trace!("Tracking event: {}", event_name);
44+
if let Some(telemetry) = GLOBAL_TELEMETRY.get() {
45+
telemetry.persist_session_refreshes();
46+
telemetry.track_event_impl(event_name, parameters)?;
47+
} else {
48+
tracing::trace!("Telemetry not initialized, skipping event");
49+
}
50+
Ok(())
51+
}
52+
53+
/// Track a page view using the global telemetry instance
54+
#[napi]
55+
pub fn track_page_view(
56+
page_title: String,
57+
page_location: Option<String>,
58+
parameters: Option<HashMap<String, String>>,
59+
) -> Result<()> {
60+
tracing::trace!(
61+
"Tracking page view: {} (location: {:?})",
62+
page_title,
63+
page_location
64+
);
65+
if let Some(telemetry) = GLOBAL_TELEMETRY.get() {
66+
telemetry.persist_session_refreshes();
67+
telemetry.track_page_view_impl(page_title, page_location, parameters)?;
68+
} else {
69+
tracing::trace!("Telemetry not initialized, skipping page view");
70+
}
71+
Ok(())
72+
}
73+
74+
/// Initialize telemetry using a DB connection.
75+
/// Gets/creates the session ID from the DB, stores the connection
76+
/// for persisting session refreshes on flush, and returns the session ID
77+
/// so the caller can set it as an env var for child processes.
78+
/// Used by CLI and daemon.
4579
#[napi]
4680
pub fn initialize_telemetry(
4781
#[napi(ts_arg_type = "ExternalObject<NxDbConnection>")] connection: &External<
@@ -58,18 +92,19 @@ pub fn initialize_telemetry(
5892
os_release: String,
5993
is_ci: bool,
6094
is_nx_cloud: bool,
61-
) -> Result<()> {
95+
) -> Result<String> {
6296
tracing::trace!(
6397
"Initializing telemetry service for workspace: {}",
6498
workspace_id
6599
);
66100

67101
let session_id = get_or_create_session_id(connection)?;
68102

69-
let service = TelemetryService::new(
103+
init_service(TelemetryOptions {
104+
session_id: session_id.clone(),
105+
db_connection: Some((**connection).clone()),
70106
workspace_id,
71107
user_id,
72-
session_id,
73108
nx_version,
74109
package_manager_name,
75110
package_manager_version,
@@ -79,45 +114,59 @@ pub fn initialize_telemetry(
79114
os_release,
80115
is_ci,
81116
is_nx_cloud,
82-
)?;
83-
84-
GLOBAL_TELEMETRY
85-
.set(Arc::new(service))
86-
.map_err(|_| Error::from_reason("Telemetry already initialized"))?;
117+
})?;
87118

88-
tracing::debug!("Telemetry service initialized successfully");
89-
Ok(())
90-
}
91-
92-
/// Track an event using the global telemetry instance
93-
#[napi]
94-
pub fn track_event(event_name: String, parameters: Option<HashMap<String, String>>) -> Result<()> {
95-
tracing::trace!("Tracking event: {}", event_name);
96-
if let Some(telemetry) = GLOBAL_TELEMETRY.get() {
97-
telemetry.track_event_impl(event_name, parameters)?;
98-
} else {
99-
tracing::trace!("Telemetry not initialized, skipping event");
100-
}
101-
Ok(())
119+
Ok(session_id)
102120
}
103121

104-
/// Track a page view using the global telemetry instance
122+
/// Initialize telemetry with a pre-fetched session ID.
123+
/// No DB connection — used by plugin workers that inherit the
124+
/// session ID from their parent process via env var.
105125
#[napi]
106-
pub fn track_page_view(
107-
page_title: String,
108-
page_location: Option<String>,
109-
parameters: Option<HashMap<String, String>>,
126+
pub fn initialize_telemetry_with_session_id(
127+
session_id: String,
128+
workspace_id: String,
129+
user_id: String,
130+
nx_version: String,
131+
package_manager_name: String,
132+
package_manager_version: Option<String>,
133+
node_version: String,
134+
os_arch: String,
135+
os_platform: String,
136+
os_release: String,
137+
is_ci: bool,
138+
is_nx_cloud: bool,
110139
) -> Result<()> {
111140
tracing::trace!(
112-
"Tracking page view: {} (location: {:?})",
113-
page_title,
114-
page_location
141+
"Initializing telemetry service (session from env) for workspace: {}",
142+
workspace_id
115143
);
116-
if let Some(telemetry) = GLOBAL_TELEMETRY.get() {
117-
telemetry.track_page_view_impl(page_title, page_location, parameters)?;
118-
} else {
119-
tracing::trace!("Telemetry not initialized, skipping page view");
120-
}
144+
145+
init_service(TelemetryOptions {
146+
session_id,
147+
workspace_id,
148+
user_id,
149+
nx_version,
150+
package_manager_name,
151+
package_manager_version,
152+
node_version,
153+
os_arch,
154+
os_platform,
155+
os_release,
156+
is_ci,
157+
is_nx_cloud,
158+
db_connection: None,
159+
})
160+
}
161+
162+
fn init_service(opts: TelemetryOptions) -> Result<()> {
163+
let service = TelemetryService::new(opts)?;
164+
165+
GLOBAL_TELEMETRY
166+
.set(Arc::new(service))
167+
.map_err(|_| Error::from_reason("Telemetry already initialized"))?;
168+
169+
tracing::debug!("Telemetry service initialized successfully");
121170
Ok(())
122171
}
123172

@@ -126,6 +175,7 @@ pub fn track_page_view(
126175
#[napi]
127176
pub fn flush_telemetry() -> Result<()> {
128177
if let Some(telemetry) = GLOBAL_TELEMETRY.get() {
178+
telemetry.persist_session_refreshes();
129179
telemetry.flush()?;
130180
}
131181
Ok(())
@@ -178,7 +228,7 @@ pub fn track_rust_event(event_name: impl Into<String>, parameters: HashMap<Strin
178228
/// Reuses an existing session if the last activity was within 30 minutes,
179229
/// otherwise creates a new session.
180230
fn get_or_create_session_id(connection: &External<Arc<Mutex<NxDbConnection>>>) -> Result<String> {
181-
let conn = connection
231+
let mut conn = connection
182232
.lock()
183233
.map_err(|e| Error::from_reason(format!("Failed to lock database connection: {}", e)))?;
184234

@@ -189,7 +239,7 @@ fn get_or_create_session_id(connection: &External<Arc<Mutex<NxDbConnection>>>) -
189239
"SELECT m1.value FROM metadata m1, metadata m2 \
190240
WHERE m1.key = 'SESSION_ID' AND m2.key = 'SESSION_LAST_ACTIVITY' \
191241
AND (strftime('%s', 'now') - strftime('%s', m2.value)) < {}",
192-
SESSION_TIMEOUT_SECS
242+
SESSION_TIMEOUT_SECS as i64
193243
),
194244
[],
195245
|row| row.get::<_, String>(0),
@@ -201,21 +251,12 @@ fn get_or_create_session_id(connection: &External<Arc<Mutex<NxDbConnection>>>) -
201251
id
202252
} else {
203253
let id = uuid::Uuid::new_v4().to_string();
204-
conn.execute(
205-
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('SESSION_ID', ?1)",
206-
[&id],
207-
)
208-
.map_err(|e| Error::from_reason(format!("Failed to save session ID: {}", e)))?;
209254
tracing::trace!("Created new analytics session: {}", id);
210255
id
211256
};
212257

213-
// Always update last activity timestamp
214-
conn.execute(
215-
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('SESSION_LAST_ACTIVITY', datetime('now'))",
216-
[],
217-
)
218-
.map_err(|e| Error::from_reason(format!("Failed to update session activity: {}", e)))?;
258+
// Persist session ID and update activity timestamp
259+
persist_session_to_db(&mut conn, &session_id);
219260

220261
Ok(session_id)
221262
}

0 commit comments

Comments
 (0)