Skip to content

Commit c2ca2d4

Browse files
committed
Revert "Spark-3213 Fixes issue with spark-ec2 not detecting slaves created with "Launch More like this""
This reverts commit 3cb4e17.
1 parent 865e6f6 commit c2ca2d4

File tree

1 file changed

+20
-25
lines changed

1 file changed

+20
-25
lines changed

ec2/spark_ec2.py

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
# A URL prefix from which to fetch AMI information
4141
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
4242

43+
4344
class UsageError(Exception):
4445
pass
4546

@@ -449,45 +450,38 @@ def launch_cluster(conn, opts, cluster_name):
449450
print "Launched master in %s, regid = %s" % (zone, master_res.id)
450451

451452
# Give the instances descriptive names
453+
# TODO: Add retry logic for tagging with name since it's used to identify a cluster.
452454
for master in master_nodes:
453455
name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
454-
tag_instance(master, name)
456+
for i in range(0, 5):
457+
try:
458+
master.add_tag(key='Name', value=name)
459+
except:
460+
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
461+
if (i == 5):
462+
raise "Error - failed max attempts to add name tag"
463+
time.sleep(5)
464+
455465

456466
for slave in slave_nodes:
457467
name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
458-
tag_instance(slave, name)
468+
for i in range(0, 5):
469+
try:
470+
slave.add_tag(key='Name', value=name)
471+
except:
472+
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
473+
if (i == 5):
474+
raise "Error - failed max attempts to add name tag"
475+
time.sleep(5)
459476

460477
# Return all the instances
461478
return (master_nodes, slave_nodes)
462479

463-
def tag_instance(instance, name):
464-
for i in range(0, 5):
465-
try:
466-
instance.add_tag(key='Name', value=name)
467-
except:
468-
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
469-
if (i == 5):
470-
raise "Error - failed max attempts to add name tag"
471-
time.sleep(5)
472480

473481
# Get the EC2 instances in an existing cluster if available.
474482
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
475483
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
476484
print "Searching for existing cluster " + cluster_name + "..."
477-
# Search all the spot instance requests, and copy any tags from the spot instance request to the cluster.
478-
spot_instance_requests = conn.get_all_spot_instance_requests()
479-
for req in spot_instance_requests:
480-
if req.state != u'active':
481-
continue
482-
name = req.tags.get(u'Name', "")
483-
if name.startswith(cluster_name):
484-
reservations = conn.get_all_instances(instance_ids=[req.instance_id])
485-
for res in reservations:
486-
active = [i for i in res.instances if is_active(i)]
487-
for instance in active:
488-
if (instance.tags.get(u'Name') == None):
489-
tag_instance(instance, name)
490-
# Now proceed to detect master and slaves instances.
491485
reservations = conn.get_all_instances()
492486
master_nodes = []
493487
slave_nodes = []
@@ -510,6 +504,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
510504
print >> sys.stderr, "ERROR: Could not find any existing cluster"
511505
sys.exit(1)
512506

507+
513508
# Deploy configuration files and run setup scripts on a newly launched
514509
# or started EC2 cluster.
515510
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):

0 commit comments

Comments
 (0)