4040# A URL prefix from which to fetch AMI information
4141AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
4242
43+
4344class UsageError (Exception ):
4445 pass
4546
@@ -449,45 +450,38 @@ def launch_cluster(conn, opts, cluster_name):
449450 print "Launched master in %s, regid = %s" % (zone , master_res .id )
450451
451452 # Give the instances descriptive names
453+ # TODO: Add retry logic for tagging with name since it's used to identify a cluster.
452454 for master in master_nodes :
453455 name = '{cn}-master-{iid}' .format (cn = cluster_name , iid = master .id )
454- tag_instance (master , name )
456+ for i in range (0 , 5 ):
457+ try :
458+ master .add_tag (key = 'Name' , value = name )
459+ except :
460+ print "Failed attempt %i of 5 to tag %s" % ((i + 1 ), name )
461+ if (i == 5 ):
462+ raise "Error - failed max attempts to add name tag"
463+ time .sleep (5 )
464+
455465
456466 for slave in slave_nodes :
457467 name = '{cn}-slave-{iid}' .format (cn = cluster_name , iid = slave .id )
458- tag_instance (slave , name )
468+ for i in range (0 , 5 ):
469+ try :
470+ slave .add_tag (key = 'Name' , value = name )
471+ except :
472+ print "Failed attempt %i of 5 to tag %s" % ((i + 1 ), name )
473+ if (i == 5 ):
474+ raise "Error - failed max attempts to add name tag"
475+ time .sleep (5 )
459476
460477 # Return all the instances
461478 return (master_nodes , slave_nodes )
462479
463- def tag_instance (instance , name ):
464- for i in range (0 , 5 ):
465- try :
466- instance .add_tag (key = 'Name' , value = name )
467- except :
468- print "Failed attempt %i of 5 to tag %s" % ((i + 1 ), name )
469- if (i == 5 ):
470- raise "Error - failed max attempts to add name tag"
471- time .sleep (5 )
472480
473481# Get the EC2 instances in an existing cluster if available.
474482# Returns a tuple of lists of EC2 instance objects for the masters and slaves
475483def get_existing_cluster (conn , opts , cluster_name , die_on_error = True ):
476484 print "Searching for existing cluster " + cluster_name + "..."
477- # Search all the spot instance requests, and copy any tags from the spot instance request to the cluster.
478- spot_instance_requests = conn .get_all_spot_instance_requests ()
479- for req in spot_instance_requests :
480- if req .state != u'active' :
481- continue
482- name = req .tags .get (u'Name' , "" )
483- if name .startswith (cluster_name ):
484- reservations = conn .get_all_instances (instance_ids = [req .instance_id ])
485- for res in reservations :
486- active = [i for i in res .instances if is_active (i )]
487- for instance in active :
488- if (instance .tags .get (u'Name' ) == None ):
489- tag_instance (instance , name )
490- # Now proceed to detect master and slaves instances.
491485 reservations = conn .get_all_instances ()
492486 master_nodes = []
493487 slave_nodes = []
@@ -510,6 +504,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
510504 print >> sys .stderr , "ERROR: Could not find any existing cluster"
511505 sys .exit (1 )
512506
507+
513508# Deploy configuration files and run setup scripts on a newly launched
514509# or started EC2 cluster.
515510def setup_cluster (conn , master_nodes , slave_nodes , opts , deploy_ssh_key ):
0 commit comments