Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit 6a9f1a2

Browse files
feat(spanner): add support for multiplexed session for r/w transactions (#2351)
* feat(spanner): add support for multiplexed session for r/w transactions * test: commit retry logic * support in batch write * support to createBatchTransaction * refactor test --------- Co-authored-by: surbhigarg92 <[email protected]>
1 parent a328811 commit 6a9f1a2

File tree

5 files changed

+1659
-717
lines changed

5 files changed

+1659
-717
lines changed

observability-test/database.ts

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ describe('Database', () => {
705705
const RESPONSE = {a: 'b'};
706706

707707
beforeEach(() => {
708-
database.pool_ = {
708+
database.sessionFactory_ = {
709709
getSession(callback) {
710710
callback(null, SESSION);
711711
},
@@ -715,7 +715,7 @@ describe('Database', () => {
715715
it('with session error', done => {
716716
const error = new Error('with session error');
717717

718-
database.pool_ = {
718+
database.sessionFactory_ = {
719719
getSession(callback) {
720720
callback(error);
721721
},
@@ -913,21 +913,24 @@ describe('Database', () => {
913913
});
914914

915915
describe('getTransaction', () => {
916-
let fakePool: FakeSessionPool;
916+
let fakeSessionFactory: FakeSessionFactory;
917917
let fakeSession: FakeSession;
918918
let fakeTransaction: FakeTransaction;
919919

920920
let getSessionStub: sinon.SinonStub;
921921

922922
beforeEach(() => {
923-
fakePool = database.pool_;
923+
fakeSessionFactory = database.sessionFactory_;
924924
fakeSession = new FakeSession();
925925
fakeTransaction = new FakeTransaction(
926926
{} as google.spanner.v1.TransactionOptions.ReadWrite,
927927
);
928928

929929
getSessionStub = (
930-
sandbox.stub(fakePool, 'getSession') as sinon.SinonStub
930+
sandbox.stub(
931+
fakeSessionFactory,
932+
'getSessionForReadWrite',
933+
) as sinon.SinonStub
931934
).callsFake(callback => {
932935
callback(null, fakeSession, fakeTransaction);
933936
});
@@ -1255,7 +1258,7 @@ describe('Database', () => {
12551258

12561259
const mutationGroups = [mutationGroup1, mutationGroup2];
12571260

1258-
let fakePool: FakeSessionPool;
1261+
let fakeSessionFactory: FakeSessionFactory;
12591262
let fakeSession: FakeSession;
12601263
let fakeDataStream: Transform;
12611264
let getSessionStub: sinon.SinonStub;
@@ -1269,12 +1272,15 @@ describe('Database', () => {
12691272
} as BatchWriteOptions;
12701273

12711274
beforeEach(() => {
1272-
fakePool = database.pool_;
1275+
fakeSessionFactory = database.sessionFactory_;
12731276
fakeSession = new FakeSession();
12741277
fakeDataStream = through.obj();
12751278

12761279
getSessionStub = (
1277-
sandbox.stub(fakePool, 'getSession') as sinon.SinonStub
1280+
sandbox.stub(
1281+
fakeSessionFactory,
1282+
'getSessionForReadWrite',
1283+
) as sinon.SinonStub
12781284
).callsFake(callback => callback(null, fakeSession));
12791285

12801286
sandbox.stub(database, 'requestStream').returns(fakeDataStream);
@@ -1464,23 +1470,26 @@ describe('Database', () => {
14641470
{} as google.spanner.v1.TransactionOptions.ReadWrite,
14651471
);
14661472

1467-
let pool: FakeSessionPool;
1473+
let fakeSessionFactory: FakeSessionFactory;
14681474

14691475
beforeEach(() => {
1470-
pool = database.pool_;
1476+
fakeSessionFactory = database.sessionFactory_;
14711477

1472-
(sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake(
1473-
callback => {
1474-
callback(null, SESSION, TRANSACTION);
1475-
},
1476-
);
1478+
(
1479+
sandbox.stub(
1480+
fakeSessionFactory,
1481+
'getSessionForReadWrite',
1482+
) as sinon.SinonStub
1483+
).callsFake(callback => {
1484+
callback(null, SESSION, TRANSACTION);
1485+
});
14771486
});
14781487

14791488
it('with error getting session', done => {
14801489
const fakeErr = new Error('getting a session');
14811490

1482-
(pool.getSession as sinon.SinonStub).callsFake(callback =>
1483-
callback(fakeErr),
1491+
(fakeSessionFactory.getSessionForReadWrite as sinon.SinonStub).callsFake(
1492+
callback => callback(fakeErr),
14841493
);
14851494

14861495
database.runTransaction(
@@ -1598,15 +1607,18 @@ describe('Database', () => {
15981607
{} as google.spanner.v1.TransactionOptions.ReadWrite,
15991608
);
16001609

1601-
let pool: FakeSessionPool;
1610+
let fakeSessionFactory: FakeSessionFactory;
16021611

16031612
beforeEach(() => {
1604-
pool = database.pool_;
1605-
(sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake(
1606-
callback => {
1607-
callback(null, SESSION, TRANSACTION);
1608-
},
1609-
);
1613+
fakeSessionFactory = database.sessionFactory_;
1614+
(
1615+
sandbox.stub(
1616+
fakeSessionFactory,
1617+
'getSessionForReadWrite',
1618+
) as sinon.SinonStub
1619+
).callsFake(callback => {
1620+
callback(null, SESSION, TRANSACTION);
1621+
});
16101622
});
16111623

16121624
it('with no error', async () => {

src/database.ts

Lines changed: 84 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,7 @@ class Database extends common.GrpcServiceObject {
906906
'Database.createBatchTransaction',
907907
this._traceConfig,
908908
span => {
909-
this.pool_.getSession((err, session) => {
909+
this.sessionFactory_.getSession((err, session) => {
910910
if (err) {
911911
setSpanError(span, err);
912912
span.end();
@@ -2285,26 +2285,28 @@ class Database extends common.GrpcServiceObject {
22852285
transactionTag: options.requestOptions?.transactionTag,
22862286
},
22872287
span => {
2288-
this.pool_.getSession((err, session, transaction) => {
2289-
if (!err) {
2290-
if (options.requestOptions) {
2291-
transaction!.requestOptions = Object.assign(
2292-
transaction!.requestOptions || {},
2293-
options.requestOptions,
2288+
this.sessionFactory_.getSessionForReadWrite(
2289+
(err, session, transaction) => {
2290+
if (!err) {
2291+
if (options.requestOptions) {
2292+
transaction!.requestOptions = Object.assign(
2293+
transaction!.requestOptions || {},
2294+
options.requestOptions,
2295+
);
2296+
}
2297+
transaction?.setReadWriteTransactionOptions(
2298+
options as RunTransactionOptions,
22942299
);
2300+
span.addEvent('Using Session', {'session.id': session?.id});
2301+
transaction!._observabilityOptions = this._observabilityOptions;
2302+
this._releaseOnEnd(session!, transaction!, span);
2303+
} else {
2304+
setSpanError(span, err);
22952305
}
2296-
transaction?.setReadWriteTransactionOptions(
2297-
options as RunTransactionOptions,
2298-
);
2299-
span.addEvent('Using Session', {'session.id': session?.id});
2300-
transaction!._observabilityOptions = this._observabilityOptions;
2301-
this._releaseOnEnd(session!, transaction!, span);
2302-
} else {
2303-
setSpanError(span, err);
2304-
}
2305-
span.end();
2306-
cb!(err as grpc.ServiceError | null, transaction);
2307-
});
2306+
span.end();
2307+
cb!(err as grpc.ServiceError | null, transaction);
2308+
},
2309+
);
23082310
},
23092311
);
23102312
}
@@ -2525,8 +2527,8 @@ class Database extends common.GrpcServiceObject {
25252527
config: RequestConfig,
25262528
callback?: PoolRequestCallback,
25272529
): void | Promise<Session> {
2528-
const pool = this.pool_;
2529-
pool.getSession((err, session) => {
2530+
const sessionFactory_ = this.sessionFactory_;
2531+
sessionFactory_.getSessionForReadWrite((err, session) => {
25302532
if (err) {
25312533
callback!(err as ServiceError, null);
25322534
return;
@@ -2536,7 +2538,7 @@ class Database extends common.GrpcServiceObject {
25362538
span.addEvent('Using Session', {'session.id': session?.id});
25372539
config.reqOpts.session = session!.formattedName_;
25382540
this.request<Session>(config, (err, ...args) => {
2539-
pool.release(session!);
2541+
sessionFactory_.release(session!);
25402542
callback!(err, ...args);
25412543
});
25422544
});
@@ -2553,7 +2555,7 @@ class Database extends common.GrpcServiceObject {
25532555
makePooledStreamingRequest_(config: RequestConfig): Readable {
25542556
// eslint-disable-next-line @typescript-eslint/no-this-alias
25552557
const self = this;
2556-
const pool = this.pool_;
2558+
const sessionFactory_ = this.sessionFactory_;
25572559
let requestStream: CancelableDuplex;
25582560
let session: Session | null;
25592561
const waitForSessionStream = streamEvents(through.obj());
@@ -2569,12 +2571,12 @@ class Database extends common.GrpcServiceObject {
25692571
}
25702572
function releaseSession() {
25712573
if (session) {
2572-
pool.release(session);
2574+
sessionFactory_.release(session);
25732575
session = null;
25742576
}
25752577
}
25762578
waitForSessionStream.on('reading', () => {
2577-
pool.getSession((err, session_) => {
2579+
sessionFactory_.getSession((err, session_) => {
25782580
const span = getActiveOrNoopSpan();
25792581
if (err) {
25802582
setSpanError(span, err as ServiceError);
@@ -3339,64 +3341,66 @@ class Database extends common.GrpcServiceObject {
33393341
transactionTag: options.requestOptions?.transactionTag,
33403342
},
33413343
span => {
3342-
this.pool_.getSession((err, session?, transaction?) => {
3343-
if (err) {
3344-
setSpanError(span, err);
3345-
}
3344+
this.sessionFactory_.getSessionForReadWrite(
3345+
(err, session?, transaction?) => {
3346+
if (err) {
3347+
setSpanError(span, err);
3348+
}
33463349

3347-
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
3348-
span.addEvent('No session available', {
3349-
'session.id': session?.id,
3350-
});
3351-
span.end();
3352-
this.runTransaction(options, runFn!);
3353-
return;
3354-
}
3350+
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
3351+
span.addEvent('No session available', {
3352+
'session.id': session?.id,
3353+
});
3354+
span.end();
3355+
this.runTransaction(options, runFn!);
3356+
return;
3357+
}
33553358

3356-
if (err) {
3357-
span.end();
3358-
runFn!(err as grpc.ServiceError);
3359-
return;
3360-
}
3359+
if (err) {
3360+
span.end();
3361+
runFn!(err as grpc.ServiceError);
3362+
return;
3363+
}
33613364

3362-
transaction!._observabilityOptions = this._observabilityOptions;
3365+
transaction!._observabilityOptions = this._observabilityOptions;
33633366

3364-
transaction!.requestOptions = Object.assign(
3365-
transaction!.requestOptions || {},
3366-
options.requestOptions,
3367-
);
3367+
transaction!.requestOptions = Object.assign(
3368+
transaction!.requestOptions || {},
3369+
options.requestOptions,
3370+
);
33683371

3369-
transaction!.setReadWriteTransactionOptions(
3370-
options as RunTransactionOptions,
3371-
);
3372+
transaction!.setReadWriteTransactionOptions(
3373+
options as RunTransactionOptions,
3374+
);
33723375

3373-
const release = () => {
3374-
this.pool_.release(session!);
3375-
span.end();
3376-
};
3376+
const release = () => {
3377+
this.sessionFactory_.release(session!);
3378+
span.end();
3379+
};
33773380

3378-
const runner = new TransactionRunner(
3379-
session!,
3380-
transaction!,
3381-
runFn!,
3382-
options,
3383-
);
3381+
const runner = new TransactionRunner(
3382+
session!,
3383+
transaction!,
3384+
runFn!,
3385+
options,
3386+
);
33843387

3385-
runner.run().then(release, err => {
3386-
setSpanError(span, err!);
3388+
runner.run().then(release, err => {
3389+
setSpanError(span, err!);
33873390

3388-
if (isSessionNotFoundError(err)) {
3389-
span.addEvent('No session available', {
3390-
'session.id': session?.id,
3391-
});
3392-
release();
3393-
this.runTransaction(options, runFn!);
3394-
} else {
3395-
setImmediate(runFn!, err);
3396-
release();
3397-
}
3398-
});
3399-
});
3391+
if (isSessionNotFoundError(err)) {
3392+
span.addEvent('No session available', {
3393+
'session.id': session?.id,
3394+
});
3395+
release();
3396+
this.runTransaction(options, runFn!);
3397+
} else {
3398+
setImmediate(runFn!, err);
3399+
release();
3400+
}
3401+
});
3402+
},
3403+
);
34003404
},
34013405
);
34023406
}
@@ -3481,7 +3485,9 @@ class Database extends common.GrpcServiceObject {
34813485
: {};
34823486

34833487
let sessionId = '';
3484-
const getSession = this.pool_.getSession.bind(this.pool_);
3488+
const getSession = this.sessionFactory_.getSessionForReadWrite.bind(
3489+
this.sessionFactory_,
3490+
);
34853491

34863492
return startTrace(
34873493
'Database.runTransactionAsync',
@@ -3519,7 +3525,7 @@ class Database extends common.GrpcServiceObject {
35193525
throw e;
35203526
} finally {
35213527
span.end();
3522-
this.pool_.release(session);
3528+
this.sessionFactory_.release(session);
35233529
}
35243530
} catch (e) {
35253531
if (isSessionNotFoundError(e as ServiceError)) {
@@ -3607,7 +3613,7 @@ class Database extends common.GrpcServiceObject {
36073613
transactionTag: options?.requestOptions?.transactionTag,
36083614
},
36093615
span => {
3610-
this.pool_.getSession((err, session) => {
3616+
this.sessionFactory_.getSessionForReadWrite((err, session) => {
36113617
if (err) {
36123618
proxyStream.destroy(err);
36133619
setSpanError(span, err);
@@ -3669,7 +3675,7 @@ class Database extends common.GrpcServiceObject {
36693675
})
36703676
.once('end', () => {
36713677
span.end();
3672-
this.pool_.release(session!);
3678+
this.sessionFactory_.release(session!);
36733679
})
36743680
.pipe(proxyStream);
36753681
});

0 commit comments

Comments
 (0)