Skip to content

Commit ad72331

Browse files
Show run status for jobs and pipeline (#1017)
## Changes * Show a run status treeItem for job, pipelines and each of the tasks. * The cancel button now also cancels the remote job / pipeline. ## Tests <!-- How is this tested? -->
1 parent 9dcbd87 commit ad72331

21 files changed

+1098
-58
lines changed

packages/databricks-vscode/src/bundle/BundleCommands.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
TreeNode as BundleResourceExplorerTreeNode,
66
ResourceTreeNode as BundleResourceExplorerResourceTreeNode,
77
} from "../ui/bundle-resource-explorer/types";
8-
import {BundleRunManager} from "./BundleRunManager";
8+
import {BundleRunStatusManager} from "./run/BundleRunStatusManager";
99

1010
const RUNNABLE_RESOURCES = [
1111
"pipelines",
@@ -35,7 +35,7 @@ export class BundleCommands implements Disposable {
3535

3636
constructor(
3737
private readonly bundleRemoteStateModel: BundleRemoteStateModel,
38-
private readonly bundleRunManager: BundleRunManager
38+
private readonly bundleRunStatusManager: BundleRunStatusManager
3939
) {
4040
this.disposables.push(this.outputChannel);
4141
}
@@ -75,7 +75,10 @@ export class BundleCommands implements Disposable {
7575
}
7676
);
7777

78-
await this.bundleRunManager.run(treeNode.resourceKey);
78+
await this.bundleRunStatusManager.run(
79+
treeNode.resourceKey,
80+
treeNode.type
81+
);
7982
}
8083

8184
@onError({popup: {prefix: "Error cancelling run."}})
@@ -84,7 +87,7 @@ export class BundleCommands implements Disposable {
8487
throw new Error(`Resource of ${treeNode.type} is not runnable`);
8588
}
8689

87-
this.bundleRunManager.cancel(treeNode.resourceKey);
90+
this.bundleRunStatusManager.cancel(treeNode.resourceKey);
8891
}
8992

9093
dispose() {

packages/databricks-vscode/src/bundle/models/BundleRemoteStateModel.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import {Context, context} from "@databricks/databricks-sdk";
1313

1414
/* eslint-disable @typescript-eslint/naming-convention */
1515
type Resources = Required<BundleTarget>["resources"];
16-
type Resource<K extends keyof Required<Resources>> = Required<Resources>[K];
16+
export type Resource<K extends keyof Required<Resources>> =
17+
Required<Resources>[K];
1718

1819
export type BundleRemoteState = BundleTarget & {
1920
resources?: Resources & {
@@ -25,11 +26,14 @@ export type BundleRemoteState = BundleTarget & {
2526
};
2627
};
2728
};
29+
30+
export type ResourceType = keyof Resources;
31+
2832
/* eslint-enable @typescript-eslint/naming-convention */
2933

3034
export class BundleRemoteStateModel extends BaseModelWithStateCache<BundleRemoteState> {
3135
public target: string | undefined;
32-
private authProvider: AuthProvider | undefined;
36+
public authProvider: AuthProvider | undefined;
3337
protected mutex = new Mutex();
3438
private refreshInterval: NodeJS.Timeout | undefined;
3539

@@ -135,6 +139,16 @@ export class BundleRemoteStateModel extends BaseModelWithStateCache<BundleRemote
135139
return JSON.parse(output);
136140
}
137141

142+
/**
143+
* @param key dot separated string that uniquely identifies a resource in the nested resources object
144+
*/
145+
async getResource(key: string) {
146+
const resources = await this.get("resources");
147+
return key.split(".").reduce((prev: any, k) => {
148+
return prev[k];
149+
}, resources);
150+
}
151+
138152
dispose() {
139153
super.dispose();
140154
if (this.refreshInterval !== undefined) {
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import {EventEmitter} from "vscode";
2+
import {ResourceType} from "../models/BundleRemoteStateModel";
3+
import {RunState} from "./types";
4+
5+
export abstract class BundleRunStatus {
6+
abstract readonly type: ResourceType;
7+
protected readonly onDidChangeEmitter = new EventEmitter<void>();
8+
readonly onDidChange = this.onDidChangeEmitter.event;
9+
runId: string | undefined;
10+
data: any;
11+
12+
protected _runState: RunState = "unknown";
13+
public get runState(): RunState {
14+
return this._runState;
15+
}
16+
public set runState(value: RunState) {
17+
this._runState = value;
18+
this.onDidChangeEmitter.fire();
19+
}
20+
21+
abstract parseId(output: string): void;
22+
abstract cancel(): Promise<void>;
23+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import {Disposable, EventEmitter} from "vscode";
2+
import {
3+
BundleRemoteStateModel,
4+
Resource,
5+
ResourceType,
6+
} from "../models/BundleRemoteStateModel";
7+
import {BundleRunTerminalManager} from "./BundleRunTerminalManager";
8+
import {JobRunStatus} from "./JobRunStatus";
9+
import {AuthProvider} from "../../configuration/auth/AuthProvider";
10+
import {BundleRunStatus} from "./BundleRunStatus";
11+
import {PipelineRunStatus} from "./PipelineRunStatus";
12+
/**
13+
* This class monitors the cli bundle run output and record ids for runs. It also polls for status of the these runs.
14+
*/
15+
export class BundleRunStatusManager implements Disposable {
16+
private disposables: Disposable[] = [];
17+
public readonly runStatuses: Map<string, BundleRunStatus> = new Map();
18+
19+
private readonly onDidChangeEmitter = new EventEmitter<void>();
20+
readonly onDidChange = this.onDidChangeEmitter.event;
21+
22+
constructor(
23+
private readonly bundleRemoteStateModel: BundleRemoteStateModel,
24+
private readonly bundleRunTerminalManager: BundleRunTerminalManager
25+
) {}
26+
27+
getRunStatusMonitor(
28+
resourceKey: string,
29+
resourceType: ResourceType,
30+
authProvider: AuthProvider,
31+
resource: any
32+
): BundleRunStatus {
33+
switch (resourceType) {
34+
case "jobs":
35+
return new JobRunStatus(authProvider);
36+
case "pipelines": {
37+
const id = (resource as Resource<"pipelines">[string]).id;
38+
if (id === undefined) {
39+
throw new Error(
40+
`Pipeline id is undefined for ${resourceKey}. This likely means the pipeline is not deployed.`
41+
);
42+
}
43+
44+
return new PipelineRunStatus(authProvider, id);
45+
}
46+
default:
47+
throw new Error(`Unknown resource type ${resourceType}`);
48+
}
49+
}
50+
51+
async run(resourceKey: string, resourceType: ResourceType) {
52+
const target = this.bundleRemoteStateModel.target;
53+
const authProvider = this.bundleRemoteStateModel.authProvider;
54+
const resource =
55+
await this.bundleRemoteStateModel.getResource(resourceKey);
56+
57+
if (target === undefined) {
58+
throw new Error(`Cannot run ${resourceKey}, Target is undefined`);
59+
}
60+
if (authProvider === undefined) {
61+
throw new Error(
62+
`Cannot run ${resourceKey}, AuthProvider is undefined`
63+
);
64+
}
65+
if (resource === undefined) {
66+
throw new Error(
67+
`Cannot run ${resourceKey}, Resource is not deployed`
68+
);
69+
}
70+
71+
const remoteRunStatus = this.getRunStatusMonitor(
72+
resourceKey,
73+
resourceType,
74+
authProvider,
75+
resource
76+
);
77+
this.runStatuses.set(resourceKey, remoteRunStatus);
78+
this.disposables.push(
79+
remoteRunStatus.onDidChange(() => {
80+
this.onDidChangeEmitter.fire();
81+
})
82+
);
83+
await this.bundleRunTerminalManager.run(resourceKey, (data) => {
84+
remoteRunStatus.parseId(data);
85+
});
86+
}
87+
88+
async cancel(resourceKey: string) {
89+
const runner = this.runStatuses.get(resourceKey);
90+
this.bundleRunTerminalManager.cancel(resourceKey);
91+
await runner?.cancel();
92+
}
93+
94+
dispose() {
95+
this.disposables.forEach((d) => d.dispose());
96+
}
97+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import {CancellationTokenSource, Disposable, Terminal, window} from "vscode";
2+
import {BundleRemoteStateModel} from "../models/BundleRemoteStateModel";
3+
import {CustomOutputTerminal} from "./CustomOutputTerminal";
4+
5+
export class BundleRunTerminalManager implements Disposable {
6+
private disposables: Disposable[] = [];
7+
private terminalDetails: Map<
8+
string,
9+
{
10+
terminal: Terminal;
11+
pty: CustomOutputTerminal;
12+
}
13+
> = new Map();
14+
15+
private cancellationTokenSources: Map<string, CancellationTokenSource> =
16+
new Map();
17+
18+
constructor(
19+
private readonly bundleRemoteStateModel: BundleRemoteStateModel
20+
) {}
21+
22+
getTerminalName(target: string, resourceKey: string) {
23+
return `Run ${resourceKey} (${target})`;
24+
}
25+
26+
async run(resourceKey: string, onDidUpdate?: (data: string) => void) {
27+
const target = this.bundleRemoteStateModel.target;
28+
if (target === undefined) {
29+
throw new Error(`Cannot run ${resourceKey}, Target is undefined`);
30+
}
31+
const terminalName = this.getTerminalName(target, resourceKey);
32+
33+
if (!this.terminalDetails.has(terminalName)) {
34+
this.terminalDetails.set(
35+
terminalName,
36+
this.createTerminal(terminalName)
37+
);
38+
}
39+
let terminal = this.terminalDetails.get(terminalName)!;
40+
41+
const disposables: Disposable[] = [];
42+
try {
43+
terminal.terminal.show();
44+
if (
45+
window.terminals.find(
46+
(i) => i.name === terminal?.terminal.name
47+
) === undefined
48+
) {
49+
// The terminal has been closed. Recreate everything.
50+
terminal = this.createTerminal(terminalName);
51+
this.terminalDetails.set(terminalName, terminal);
52+
}
53+
if (terminal.pty.process !== undefined) {
54+
// There is already a process running. Raise error
55+
throw new Error(
56+
`Process already running. Pid: ${terminal.pty.process.pid}`
57+
);
58+
}
59+
60+
const cancellationTokenSource = new CancellationTokenSource();
61+
this.cancellationTokenSources.set(
62+
terminalName,
63+
cancellationTokenSource
64+
);
65+
const onCancellationEvent =
66+
cancellationTokenSource.token.onCancellationRequested(() => {
67+
terminal?.pty.close();
68+
//Dispose self on cancellation
69+
onCancellationEvent.dispose();
70+
}, this.disposables);
71+
72+
const cmd = this.bundleRemoteStateModel.getRunCommand(resourceKey);
73+
74+
// spawn a new process with the latest command, in the same terminal.
75+
terminal.pty.spawn(cmd);
76+
terminal.terminal.show();
77+
78+
disposables.push(
79+
terminal.pty.onDidWrite((data) => {
80+
onDidUpdate?.(data);
81+
})
82+
);
83+
84+
// Wait for the process to exit
85+
await new Promise<void>((resolve, reject) => {
86+
if (terminal === undefined) {
87+
resolve();
88+
return;
89+
}
90+
terminal.pty.onDidCloseProcess((exitCode) => {
91+
if (exitCode === 0 || terminal.pty.isClosed) {
92+
// Resolve when the process exits with code 0 or is closed by human action
93+
resolve();
94+
} else {
95+
reject(
96+
new Error(`Process exited with code ${exitCode}`)
97+
);
98+
}
99+
}, disposables);
100+
window.onDidCloseTerminal((e) => {
101+
// Resolve when the process is closed by human action
102+
e.name === terminal.terminal.name && reject(resolve());
103+
}, disposables);
104+
});
105+
} finally {
106+
disposables.forEach((i) => i.dispose());
107+
108+
this.cancellationTokenSources.get(terminalName)?.cancel();
109+
this.cancellationTokenSources.get(terminalName)?.dispose();
110+
this.cancellationTokenSources.delete(terminalName);
111+
}
112+
}
113+
114+
createTerminal(terminalName: string) {
115+
const pty = new CustomOutputTerminal();
116+
const terminal = {
117+
pty,
118+
terminal: window.createTerminal({
119+
name: terminalName,
120+
pty,
121+
isTransient: true,
122+
}),
123+
};
124+
125+
this.disposables.push(terminal.terminal);
126+
return terminal;
127+
}
128+
129+
cancel(resourceKey: string) {
130+
const target = this.bundleRemoteStateModel.target;
131+
if (target === undefined) {
132+
throw new Error(
133+
`Cannot delete ${resourceKey}, Target is undefined`
134+
);
135+
}
136+
137+
const terminalName = this.getTerminalName(target, resourceKey);
138+
this.cancellationTokenSources.get(terminalName)?.cancel();
139+
this.cancellationTokenSources.get(terminalName)?.dispose();
140+
this.cancellationTokenSources.delete(terminalName);
141+
}
142+
143+
cancelAll() {
144+
this.cancellationTokenSources.forEach((cs) => {
145+
cs.cancel();
146+
});
147+
148+
this.cancellationTokenSources.clear();
149+
}
150+
151+
dispose() {
152+
this.disposables.forEach((i) => i.dispose());
153+
}
154+
}

0 commit comments

Comments
 (0)