Skip to content

Commit fe5c4da

Browse files
Quest-674: Changes to support BYOC/BYOJ pipeline creation (#325)
1 parent 96d16fd commit fe5c4da

File tree

2 files changed

+56
-9
lines changed

2 files changed

+56
-9
lines changed

qds_sdk/pipelines.py

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -339,23 +339,70 @@ def create(cls, pipeline_name, create_type, **kwargs):
339339
340340
Args:
341341
pipeline_name: Name to be given.
342-
create_type: 1->Assisted, 2->Code, 3->Jar
342+
create_type: 1->Assisted, 2->Jar, 3->Code
343343
**kwargs: keyword arguments specific to create type
344344
345345
Returns:
346346
response
347347
"""
348348
conn = Qubole.agent()
349-
data = {"data": {
350-
"attributes":
351-
{"name": pipeline_name, "status": "DRAFT",
352-
"create_type": create_type},
353-
"type": "pipelines"}
354-
}
355-
url = Pipelines.rest_entity_path + "?mode=wizard"
349+
url = Pipelines.rest_entity_path
350+
if create_type is None:
351+
raise ParseError("Provide create_type for Pipeline.", None)
352+
if not kwargs or create_type == 1:
353+
data = {
354+
"data": {
355+
"attributes": {
356+
"name": pipeline_name,
357+
"status": "DRAFT",
358+
"create_type": create_type
359+
},
360+
"type": "pipeline"
361+
}
362+
}
363+
url = url + "?mode=wizard"
364+
else:
365+
data = {
366+
"data": {
367+
"type": "pipeline",
368+
"attributes": {
369+
"name": pipeline_name,
370+
"create_type": create_type,
371+
"properties": {
372+
"cluster_label": kwargs.get('cluster_label'),
373+
"can_retry": kwargs.get('can_retry'),
374+
"command_line_options": kwargs.get('command_line_options'),
375+
"user_arguments": kwargs.get('user_arguments')
376+
}
377+
},
378+
"relationships": {
379+
"alerts": {
380+
"data": {
381+
"type": "pipeline/alerts",
382+
"attributes": {
383+
"can_notify": kwargs.get('can_notify'),
384+
"notification_channels": kwargs.get('channel_ids')
385+
}
386+
}
387+
}
388+
}
389+
}
390+
}
391+
if create_type == 2:
392+
data['data']['attributes']['properties']['jar_path'] = \
393+
kwargs.get('jar_path')
394+
data['data']['attributes']['properties']['main_class_name'] = \
395+
kwargs.get('main_class_name')
396+
elif create_type == 3:
397+
data['data']['attributes']['properties']['code'] = \
398+
kwargs.get('code')
399+
data['data']['attributes']['properties']['language'] = \
400+
kwargs.get('language')
401+
356402
response = conn.post(url, data)
357403
cls.pipeline_id = Pipelines.get_pipline_id(response)
358404
cls.pipeline_name = pipeline_name
405+
return response
359406

360407
@staticmethod
361408
def start(pipeline_id):

tests/test_quest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def test_create_pipeline(self):
6464
'--cluster-label', 'spark', '-c', 'print("hello")', '--language', 'python', '--user-arguments', 'users_argument']
6565
print_command()
6666
d1 = {"data": {"attributes": {"name": "test_pipeline_name", "status": "DRAFT", "create_type": 3},
67-
"type": "pipelines"}}
67+
"type": "pipeline"}}
6868
response = {"relationships": {"nodes": [], "alerts": []}, "included": [],
6969
"meta": {"command_details": {"code": "print(\"hello\")", "language": "python"},
7070
"properties": {"checkpoint_location": None, "trigger_interval": None,

0 commit comments

Comments
 (0)