-
Notifications
You must be signed in to change notification settings - Fork 940
Add kubernetes labels #1236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add kubernetes labels #1236
Changes from all commits
fdd77d5
cd1d1fb
0fed321
7ab96b5
7e9d111
88828c0
3142898
1e510ec
adead0c
f1e53c8
a435900
21932a4
0499a22
50847db
7bba536
6f1feef
848fe7a
cfedfbf
55d0ea8
10b25a6
5b55a24
0ff88d5
22d4be8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,10 @@ | ||
| import hashlib | ||
| import json | ||
| import os | ||
| import platform | ||
| import re | ||
| import sys | ||
dhpollack marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| from typing import Dict, List, Optional, Union | ||
|
|
||
| from metaflow.decorators import StepDecorator | ||
| from metaflow.exception import MetaflowException | ||
|
|
@@ -12,11 +15,12 @@ | |
| KUBERNETES_CONTAINER_IMAGE, | ||
| KUBERNETES_CONTAINER_REGISTRY, | ||
| KUBERNETES_GPU_VENDOR, | ||
| KUBERNETES_LABELS, | ||
| KUBERNETES_NAMESPACE, | ||
| KUBERNETES_NODE_SELECTOR, | ||
| KUBERNETES_TOLERATIONS, | ||
| KUBERNETES_SERVICE_ACCOUNT, | ||
| KUBERNETES_SECRETS, | ||
| KUBERNETES_SERVICE_ACCOUNT, | ||
| KUBERNETES_FETCH_EC2_METADATA, | ||
| ) | ||
| from metaflow.plugins.resources_decorator import ResourcesDecorator | ||
|
|
@@ -65,6 +69,8 @@ class KubernetesDecorator(StepDecorator): | |
| in Metaflow configuration. | ||
| tolerations : List[str], default: METAFLOW_KUBERNETES_TOLERATIONS | ||
| Kubernetes tolerations to use when launching pod in Kubernetes. | ||
| labels : Dict[str, str], default: METAFLOW_KUBERNETES_LABELS | ||
| Kubernetes labels to use when launching pod in Kubernetes. | ||
| """ | ||
|
|
||
| name = "kubernetes" | ||
|
|
@@ -76,6 +82,7 @@ class KubernetesDecorator(StepDecorator): | |
| "service_account": None, | ||
| "secrets": None, # e.g., mysecret | ||
| "node_selector": None, # e.g., kubernetes.io/os=linux | ||
| "labels": None, # e.g., my_label=my_value | ||
| "namespace": None, | ||
| "gpu": None, # value of 0 implies that the scheduled node should not have GPUs | ||
| "gpu_vendor": None, | ||
|
|
@@ -99,9 +106,17 @@ def __init__(self, attributes=None, statically_defined=False): | |
| self.attributes["node_selector"] = KUBERNETES_NODE_SELECTOR | ||
| if not self.attributes["tolerations"] and KUBERNETES_TOLERATIONS: | ||
| self.attributes["tolerations"] = json.loads(KUBERNETES_TOLERATIONS) | ||
| if not self.attributes["labels"] and KUBERNETES_LABELS: | ||
| self.attributes["labels"] = KUBERNETES_LABELS | ||
|
|
||
| if isinstance(self.attributes["labels"], str): | ||
| self.attributes["labels"] = self.parse_kube_keyvalue_list( | ||
| self.attributes["labels"].split(","), False | ||
| ) | ||
| self.validate_kube_labels(self.attributes["labels"]) | ||
|
|
||
| if isinstance(self.attributes["node_selector"], str): | ||
| self.attributes["node_selector"] = self.parse_node_selector( | ||
| self.attributes["node_selector"] = self.parse_kube_keyvalue_list( | ||
| self.attributes["node_selector"].split(",") | ||
| ) | ||
|
|
||
|
|
@@ -280,10 +295,11 @@ def runtime_step_cli( | |
| for k, v in self.attributes.items(): | ||
| if k == "namespace": | ||
| cli_args.command_options["k8s_namespace"] = v | ||
| elif k == "node_selector" and v: | ||
| cli_args.command_options[k] = ",".join( | ||
| ["=".join([key, str(val)]) for key, val in v.items()] | ||
| ) | ||
| elif k in {"node_selector", "labels"} and v: | ||
| cli_args.command_options[k] = [ | ||
| "=".join([key, str(val)]) if val else key | ||
| for key, val in v.items() | ||
| ] | ||
| elif k == "tolerations": | ||
| cli_args.command_options[k] = json.dumps(v) | ||
| else: | ||
|
|
@@ -391,14 +407,80 @@ def _save_package_once(cls, flow_datastore, package): | |
| [package.blob], len_hint=1 | ||
| )[0] | ||
|
|
||
| @classmethod | ||
| def _parse_decorator_spec(cls, deco_spec: str): | ||
| if not deco_spec: | ||
| return cls() | ||
|
|
||
| valid_options = "|".join(cls.defaults.keys()) | ||
| deco_spec_parts = [] | ||
| for part in re.split(""",(?=[\s\w]+[{}]=)""".format(valid_options), deco_spec): | ||
| name, val = part.split("=", 1) | ||
| if name in {"labels", "node_selector"}: | ||
| try: | ||
| tmp_vals = json.loads(val.strip().replace('\\"', '"')) | ||
| for val_i in tmp_vals.values(): | ||
| if not (val_i is None or isinstance(val_i, str)): | ||
| raise KubernetesException( | ||
| "All values must be string or null." | ||
| ) | ||
| except json.JSONDecodeError: | ||
| if val.startswith("{"): | ||
| raise KubernetesException( | ||
| "Malform json detected in %s" % str(val) | ||
| ) | ||
| both = name == "node_selector" | ||
| val = json.dumps( | ||
| cls.parse_kube_keyvalue_list(val.split(","), both), | ||
| separators=(",", ":"), | ||
| ) | ||
| deco_spec_parts.append("=".join([name, val])) | ||
| deco_spec_parsed = ",".join(deco_spec_parts) | ||
| return super()._parse_decorator_spec(deco_spec_parsed) | ||
|
|
||
| @staticmethod | ||
| def parse_node_selector(node_selector: list): | ||
| def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True): | ||
| try: | ||
| return { | ||
| str(k.split("=", 1)[0]): str(k.split("=", 1)[1]) | ||
| for k in node_selector or [] | ||
| } | ||
| ret = {} | ||
| for item_str in items: | ||
| item = item_str.split("=", 1) | ||
| if requires_both: | ||
| item[1] # raise IndexError | ||
| if str(item[0]) in ret: | ||
| raise KubernetesException("Duplicate key found: %s" % str(item[0])) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In its current form the exception does not convey where the duplicate key was found, as it makes no mention of 'labels' when I set duplicate labels. As this is used for checks other than the labels as well, A more suitable place to perform the duplicate label check might be in the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is used on both labels and node selectors and putting the check here ensures that we catch duplicate keys in both sets. |
||
| ret[str(item[0])] = str(item[1]) if len(item) > 1 else None | ||
| return ret | ||
| except KubernetesException as e: | ||
| raise e | ||
| except (AttributeError, IndexError): | ||
| raise KubernetesException( | ||
| "Unable to parse node_selector: %s" % node_selector | ||
| ) | ||
| raise KubernetesException("Unable to parse kubernetes list: %s" % items) | ||
|
|
||
| @staticmethod | ||
| def validate_kube_labels( | ||
| labels: Optional[Dict[str, Optional[str]]], | ||
| ) -> bool: | ||
| """Validate label values. | ||
|
|
||
| This validates the kubernetes label values. It does not validate the keys. | ||
| Ideally, keys should be static and also the validation rules for keys are | ||
| more complex than those for values. For full validation rules, see: | ||
|
|
||
| https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set | ||
| """ | ||
|
|
||
| def validate_label(s: Optional[str]): | ||
| regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$" | ||
| if not s: | ||
| # allow empty label | ||
| return True | ||
| if not re.search(regex_match, s): | ||
| raise KubernetesException( | ||
| 'Invalid value: "%s"\n' | ||
| "A valid label must be an empty string or one that\n" | ||
| " - Consist of alphanumeric, '-', '_' or '.' characters\n" | ||
| " - Begins and ends with an alphanumeric character\n" | ||
| " - Is at most 63 characters" % s | ||
| ) | ||
| return True | ||
|
|
||
| return all([validate_label(v) for v in labels.values()]) if labels else True | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| import pytest | ||
|
|
||
| from metaflow.plugins.kubernetes.kubernetes import KubernetesException | ||
| from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is awesome! Thank you so much for doing this! |
||
| "labels", | ||
| [ | ||
| None, | ||
| {"label": "value"}, | ||
| {"label1": "val1", "label2": "val2"}, | ||
| {"label1": "val1", "label2": None}, | ||
| {"label": "a"}, | ||
| {"label": ""}, | ||
| { | ||
| "label": ( | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "123" | ||
| ) | ||
| }, | ||
| { | ||
| "label": ( | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234-_.890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "123" | ||
| ) | ||
| }, | ||
| ], | ||
| ) | ||
| def test_kubernetes_decorator_validate_kube_labels(labels): | ||
| assert KubernetesDecorator.validate_kube_labels(labels) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "labels", | ||
| [ | ||
| {"label": "a-"}, | ||
| {"label": ".a"}, | ||
| {"label": "test()"}, | ||
| { | ||
| "label": ( | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234567890" | ||
| "1234" | ||
| ) | ||
| }, | ||
| {"label": "(){}??"}, | ||
| {"valid": "test", "invalid": "bißchen"}, | ||
| ], | ||
| ) | ||
| def test_kubernetes_decorator_validate_kube_labels_fail(labels): | ||
| """Fail if label contains invalid characters or is too long""" | ||
| with pytest.raises(KubernetesException): | ||
| KubernetesDecorator.validate_kube_labels(labels) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "items,requires_both,expected", | ||
| [ | ||
| (["key=value"], True, {"key": "value"}), | ||
| (["key=value"], False, {"key": "value"}), | ||
| (["key"], False, {"key": None}), | ||
| (["key=value", "key2=value2"], True, {"key": "value", "key2": "value2"}), | ||
| ], | ||
| ) | ||
| def test_kubernetes_parse_keyvalue_list(items, requires_both, expected): | ||
| ret = KubernetesDecorator.parse_kube_keyvalue_list(items, requires_both) | ||
| assert ret == expected | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "items,requires_both", | ||
| [ | ||
| (["key=value", "key=value2"], True), | ||
| (["key"], True), | ||
| ], | ||
| ) | ||
| def test_kubernetes_parse_keyvalue_list(items, requires_both): | ||
| with pytest.raises(KubernetesException): | ||
| KubernetesDecorator.parse_kube_keyvalue_list(items, requires_both) | ||
Uh oh!
There was an error while loading. Please reload this page.