-
Notifications
You must be signed in to change notification settings - Fork 118
Expand file tree
/
Copy pathbuild
More file actions
executable file
·156 lines (130 loc) · 7.19 KB
/
build
File metadata and controls
executable file
·156 lines (130 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python
import argparse
from pathlib import Path
import subprocess
import sys
import re
import os
import glob
import datetime
import io
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import Library, LibraryInstallStatus, State
def run_cmd(cmd):
try:
return subprocess.run(cmd, check=True, text=True, capture_output=True)
except subprocess.CalledProcessError as e:
error_str = f'Command "{" ".join(cmd)}" returned code {e.returncode}\n\nStdout:\n{e.stdout}\n\nStderr:\n{e.stderr}'
sys.exit(e.returncode)
def uninstall_if_matches(w, cluster_id, name, lib_type):
libs = [l for l in w.libraries.cluster_status(cluster_id) if l.status == LibraryInstallStatus.INSTALLED]
libs = [l.library for l in libs if lib_type in l.library.as_dict() and name in l.library.as_dict()[lib_type]]
if len(libs) == 0:
return False
print(f'Uninstalling existing libraries {", ".join([l.as_dict()[lib_type] for l in libs])} from cluster')
w.libraries.uninstall(cluster_id, libs)
return True
class ChDir(object):
def __init__(self, dir):
self.dir = dir
self.olddir = os.getcwd()
def __enter__(self):
os.chdir(self.dir)
def __exit__(self, *args):
os.chdir(self.olddir)
def validate_cluster(client, cluster_id):
"""Validate cluster runtime and security mode"""
cluster = client.clusters.get(cluster_id)
# Check runtime version (16.4.x-scala2.12)
runtime = cluster.spark_version
if runtime is None:
print(f"Error: Cluster runtime version is not available. Cluster may be in an invalid state.")
sys.exit(1)
if not runtime.startswith('16.4.') or 'scala2.12' not in runtime:
print(f"Error: Cluster must use runtime 16.4.x-scala2.12, found: {runtime}")
sys.exit(1)
# Check data security mode (SINGLE_USER)
security_mode = cluster.data_security_mode
if security_mode is None:
print(f"Error: Cluster data security mode is not available. Cluster may be in an invalid state.")
sys.exit(1)
if security_mode.value != 'SINGLE_USER':
print(f"Error: Cluster must have data_security_mode set to SINGLE_USER, found: {security_mode.value}")
sys.exit(1)
print(f"✓ Cluster validation passed: runtime={runtime}, security_mode={security_mode.value}")
def main(args):
project_root = Path(__file__).parent.parent
os.chdir(project_root)
whl_path = None
if args.python:
with ChDir('python'):
out = run_cmd(['python', 'setup.py', 'bdist_wheel'])
whl_path = str(project_root / ('python/dist/' + re.search(r'glow\.py\S+\.whl', out.stdout).group(0)))
print(f'Built Python wheel {Path(whl_path).resolve()}')
jar_path = None
if args.scala:
out = run_cmd(['sbt', 'core/assembly'])
core_target_dir = re.search(r'core/\S+/scala-[.\d]+/', out.stdout).group(0)
jars = glob.glob(core_target_dir + '*assembly*.jar')
jar_path = max(jars, key=os.path.getctime)
print(f'Built Scala assembly jar {Path(jar_path).resolve()}')
if args.install:
# Validate that upload-to is provided
if not args.upload_to:
print("Error: --upload-to is required when --install is specified")
sys.exit(1)
# Strip trailing slash from upload path
upload_to = args.upload_to.rstrip('/')
# Validate that upload path is a Volume path
if not upload_to.startswith('/Volumes/'):
print("Error: --upload-to must be a Unity Catalog volume path (e.g., /Volumes/catalog/schema/volume)")
sys.exit(1)
now = datetime.datetime.now().strftime('%d-%m-%Y_%H:%M:%S,%f')
client = WorkspaceClient()
# Validate cluster configuration
validate_cluster(client, args.install)
# For volumes: /Volumes/catalog/schema/volume/path
remote_fname_prefix = f'{upload_to}/{now}'
print(f'Uploading artifacts to Unity Catalog volume: {remote_fname_prefix}')
uninstalled_lib = False
if jar_path is not None:
jar_name = jar_path.split('/')[-1]
uninstalled_lib = uninstall_if_matches(client, args.install, jar_name, 'jar') or uninstalled_lib
# Upload to volume using files API
volume_path = f'{remote_fname_prefix}/{jar_name}'
with open(jar_path, 'rb') as f:
file_bytes = f.read()
binary_data = io.BytesIO(file_bytes)
client.files.upload(volume_path, binary_data, overwrite=True)
# Libraries need dbfs:/Volumes/ format
install_path = f'dbfs:{volume_path}'
client.libraries.install(args.install, [Library(jar=install_path)])
print(f'Installed jar from {install_path} ')
if whl_path is not None:
whl_name = whl_path.split('/')[-1]
uninstalled_lib = uninstall_if_matches(client, args.install, whl_name, 'whl') or uninstalled_lib
# Upload to volume using files API
volume_path = f'{remote_fname_prefix}/{whl_name}'
with open(whl_path, 'rb') as f:
file_bytes = f.read()
binary_data = io.BytesIO(file_bytes)
client.files.upload(volume_path, binary_data, overwrite=True)
# Libraries need dbfs:/Volumes/ format
install_path = f'dbfs:{volume_path}'
client.libraries.install(args.install, [Library(whl=install_path)])
print(f'Installed whl from {install_path}')
if uninstalled_lib and client.clusters.get(args.install).state in [State.RUNNING, State.RESIZING]:
print(f'Restarting cluster so new libraries will take effect')
client.clusters.restart(args.install)
parser = argparse.ArgumentParser(description='''
A script to build Glow artifacts and install them on a Databricks cluster. This script assumes that
the local environment is already set up (conda environment, sbt and Java installation) for whichever artifacts are requested, and
if installation is requested, the cluster already exists.
Any artifacts uploaded to volumes are not automatically deleted. Deletion should be performed manually or with a cloud storage retention policy.''')
parser.add_argument('--python', help='Build a Python wheel', action='store_true')
parser.add_argument('--scala', help='Build a Scala assembly jar', action='store_true')
parser.add_argument('--install', metavar='CLUSTER_ID', help='If provided, install built artifacts on this cluster. If currently running, the cluster will be restarted. ' +
'Requires --upload-to to be specified. Databricks authentication must be provided via environment variables')
parser.add_argument('--upload-to', metavar='VOLUME_PATH', help='Upload artifacts to this Unity Catalog volume path (required when --install is specified). ' +
'Must be a volume path in the format /Volumes/catalog/schema/volume. Trailing slashes will be stripped.')
main(parser.parse_args())