diff --git a/scripts/bidmach_ec2.py b/scripts/bidmach_ec2.py index bbb4e24c..e0c07e15 100755 --- a/scripts/bidmach_ec2.py +++ b/scripts/bidmach_ec2.py @@ -24,11 +24,11 @@ # What this script does: # Launch (create and start) a cluster, start, stop and login to it. # Does not install any software -# Configures Spark and Hadoop installations on the Master and Slaves +# Configures Spark and Hadoop installations on the Main and Subordinates # # What this script assumes: # (optional Spark and) Hadoop should be installed on the ec2 image -# Hadoop should be configured to run on port 9000 on the master +# Hadoop should be configured to run on port 9000 on the main from __future__ import division, print_function, with_statement @@ -171,14 +171,14 @@ def parse_args(): prog="launch_cluster", version="1.00", usage="%prog [options] \n\n" - + " can be: launch, destroy, login, stop, start, get-master, get-slaves, reboot-slaves") + + " can be: launch, destroy, login, stop, start, get-main, get-subordinates, reboot-subordinates") parser.add_option( - "-s", "--slaves", type="int", default=1, - help="Number of slaves to launch (default: %default)") + "-s", "--subordinates", type="int", default=1, + help="Number of subordinates to launch (default: %default)") parser.add_option( "-n", "--node", type="int", - help="Number of slave to login to") + help="Number of subordinate to login to") parser.add_option( "-w", "--wait", type="int", help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start") @@ -193,15 +193,15 @@ def parse_args(): help="Type of instance to launch (default: %default). " + "WARNING: must be 64-bit; small instances won't work") parser.add_option( - "-m", "--master-instance-type", default="", - help="Master instance type (leave empty for same as instance-type)") + "-m", "--main-instance-type", default="", + help="Main instance type (leave empty for same as instance-type)") parser.add_option( "-r", "--region", default="us-east-1", help="EC2 region used to launch instances in, or to find them in (default: %default)") parser.add_option( "-z", "--zone", default="", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "subordinates across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies) (default: a single zone chosen at random)") parser.add_option( "-a", "--ami", @@ -233,7 +233,7 @@ def parse_args(): help="Swap space to set up per node, in MB (default: %default)") parser.add_option( "--spot-price", metavar="PRICE", type="float", - help="If specified, launch slaves as spot instances with the given " + + help="If specified, launch subordinates as spot instances with the given " + "maximum price (in dollars)") parser.add_option( "--ganglia", action="store_true", default=True, @@ -249,15 +249,15 @@ def parse_args(): "--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") parser.add_option( - "--use-existing-master", action="store_true", default=False, - help="Launch fresh slaves, but use an existing stopped master if possible") + "--use-existing-main", action="store_true", default=False, + help="Launch fresh subordinates, but use an existing stopped main if possible") parser.add_option( "--worker-instances", type="int", default=1, help="Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN " + "is used as Hadoop major version (default: %default)") parser.add_option( - "--master-opts", type="string", default="", - help="Extra options to give to master through SPARK_MASTER_OPTS variable " + + "--main-opts", type="string", default="", + help="Extra options to give to main through SPARK_MASTER_OPTS variable " + "(e.g -Dspark.worker.timeout=180)") parser.add_option( "--user-data", type="string", default="", @@ -265,7 +265,7 @@ def parse_args(): parser.add_option( "--deploy-root-dir", default=None, - help="A directory to copy into / on the first master. " + + help="A directory to copy into / on the first main. " + "Must be absolute. Note that a trailing slash is handled as per rsync: " + "If you omit it, the last directory of the --deploy-root-dir path will be created " + "in / before copying its contents. If you append the trailing slash, " + @@ -406,7 +406,7 @@ def get_or_make_group(conn, name, vpc_id): # Launch a cluster of the given name, by setting up its security groups, # and then starting new instances in them. -# Returns a tuple of EC2 reservation objects for the master and slaves +# Returns a tuple of EC2 reservation objects for the main and subordinates # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, cluster_name): if opts.identity_file is None: @@ -423,91 +423,91 @@ def launch_cluster(conn, opts, cluster_name): user_data_content = user_data_file.read() print("Setting up security groups...") - master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) - slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) + main_group = get_or_make_group(conn, cluster_name + "-main", opts.vpc_id) + subordinate_group = get_or_make_group(conn, cluster_name + "-subordinates", opts.vpc_id) authorized_address = opts.authorized_address - if master_group.rules == []: # Group was just now created + if main_group.rules == []: # Group was just now created if opts.vpc_id is None: - master_group.authorize(src_group=master_group) - master_group.authorize(src_group=slave_group) + main_group.authorize(src_group=main_group) + main_group.authorize(src_group=subordinate_group) else: - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) - master_group.authorize('tcp', 22, 22, authorized_address) - master_group.authorize('tcp', 8080, 8081, authorized_address) - master_group.authorize('tcp', 18080, 18080, authorized_address) - master_group.authorize('tcp', 19999, 19999, authorized_address) - master_group.authorize('tcp', 50030, 50030, authorized_address) - master_group.authorize('tcp', 50070, 50070, authorized_address) - master_group.authorize('tcp', 60070, 60070, authorized_address) - master_group.authorize('tcp', 4040, 4045, authorized_address) + main_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=main_group) + main_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=main_group) + main_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=main_group) + main_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=subordinate_group) + main_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=subordinate_group) + main_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=subordinate_group) + main_group.authorize('tcp', 22, 22, authorized_address) + main_group.authorize('tcp', 8080, 8081, authorized_address) + main_group.authorize('tcp', 18080, 18080, authorized_address) + main_group.authorize('tcp', 19999, 19999, authorized_address) + main_group.authorize('tcp', 50030, 50030, authorized_address) + main_group.authorize('tcp', 50070, 50070, authorized_address) + main_group.authorize('tcp', 60070, 60070, authorized_address) + main_group.authorize('tcp', 4040, 4045, authorized_address) # Rstudio (GUI for R) needs port 8787 for web access - master_group.authorize('tcp', 8787, 8787, authorized_address) + main_group.authorize('tcp', 8787, 8787, authorized_address) # HDFS NFS gateway requires 111,2049,4242 for tcp & udp - master_group.authorize('tcp', 111, 111, authorized_address) - master_group.authorize('udp', 111, 111, authorized_address) - master_group.authorize('tcp', 2049, 2049, authorized_address) - master_group.authorize('udp', 2049, 2049, authorized_address) - master_group.authorize('tcp', 4242, 4242, authorized_address) - master_group.authorize('udp', 4242, 4242, authorized_address) + main_group.authorize('tcp', 111, 111, authorized_address) + main_group.authorize('udp', 111, 111, authorized_address) + main_group.authorize('tcp', 2049, 2049, authorized_address) + main_group.authorize('udp', 2049, 2049, authorized_address) + main_group.authorize('tcp', 4242, 4242, authorized_address) + main_group.authorize('udp', 4242, 4242, authorized_address) # RM in YARN mode uses 8088 - master_group.authorize('tcp', 8088, 8088, authorized_address) + main_group.authorize('tcp', 8088, 8088, authorized_address) if opts.ganglia: - master_group.authorize('tcp', 5080, 5080, authorized_address) - if slave_group.rules == []: # Group was just now created + main_group.authorize('tcp', 5080, 5080, authorized_address) + if subordinate_group.rules == []: # Group was just now created if opts.vpc_id is None: - slave_group.authorize(src_group=master_group) - slave_group.authorize(src_group=slave_group) + subordinate_group.authorize(src_group=main_group) + subordinate_group.authorize(src_group=subordinate_group) else: - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize('tcp', 22, 22, authorized_address) - slave_group.authorize('tcp', 8080, 8081, authorized_address) - slave_group.authorize('tcp', 50060, 50060, authorized_address) - slave_group.authorize('tcp', 50075, 50075, authorized_address) - slave_group.authorize('tcp', 60060, 60060, authorized_address) - slave_group.authorize('tcp', 60075, 60075, authorized_address) + subordinate_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=main_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=main_group) + subordinate_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=main_group) + subordinate_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=subordinate_group) + subordinate_group.authorize('tcp', 22, 22, authorized_address) + subordinate_group.authorize('tcp', 8080, 8081, authorized_address) + subordinate_group.authorize('tcp', 50060, 50060, authorized_address) + subordinate_group.authorize('tcp', 50075, 50075, authorized_address) + subordinate_group.authorize('tcp', 60060, 60060, authorized_address) + subordinate_group.authorize('tcp', 60075, 60075, authorized_address) #Kylix - slave_group.authorize(ip_protocol='tcp', from_port=50050, to_port=50060, - src_group=slave_group) - slave_group.authorize(ip_protocol='udp', from_port=50050, to_port=50060, - src_group=slave_group) - slave_group.authorize(ip_protocol='tcp', from_port=50050, to_port=50060, - src_group=master_group) - slave_group.authorize(ip_protocol='udp', from_port=50050, to_port=50060, - src_group=master_group) - master_group.authorize(ip_protocol='tcp', from_port=50050, to_port=50060, - src_group=slave_group) - master_group.authorize(ip_protocol='udp', from_port=50050, to_port=50060, - src_group=slave_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=50050, to_port=50060, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='udp', from_port=50050, to_port=50060, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=50050, to_port=50060, + src_group=main_group) + subordinate_group.authorize(ip_protocol='udp', from_port=50050, to_port=50060, + src_group=main_group) + main_group.authorize(ip_protocol='tcp', from_port=50050, to_port=50060, + src_group=subordinate_group) + main_group.authorize(ip_protocol='udp', from_port=50050, to_port=50060, + src_group=subordinate_group) # Check if instances are already running in our groups - existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, + existing_mains, existing_subordinates = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) - if existing_slaves or (existing_masters and not opts.use_existing_master): + if existing_subordinates or (existing_mains and not opts.use_existing_main): print("ERROR: There are already instances running in group %s or %s" % - (master_group.name, slave_group.name), file=stderr) + (main_group.name, subordinate_group.name), file=stderr) sys.exit(1) # we use group ids to work around https://github.com/boto/boto/issues/350 @@ -544,32 +544,32 @@ def launch_cluster(conn, opts, cluster_name): name = '/dev/sd' + string.letters[i + 1] block_map[name] = dev - # Launch slaves + # Launch subordinates if opts.spot_price is not None: # Launch spot instances with the requested price - print("Requesting %d slaves as spot instances with price $%.3f" % - (opts.slaves, opts.spot_price)) + print("Requesting %d subordinates as spot instances with price $%.3f" % + (opts.subordinates, opts.spot_price)) zones = get_zones(conn, opts) num_zones = len(zones) i = 0 my_req_ids = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - slave_reqs = conn.request_spot_instances( + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + subordinate_reqs = conn.request_spot_instances( price=opts.spot_price, image_id=opts.ami, launch_group="launch-group-%s" % cluster_name, placement=zone, - count=num_slaves_this_zone, + count=num_subordinates_this_zone, key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_profile_name=opts.instance_profile_name) - my_req_ids += [req.id for req in slave_reqs] + my_req_ids += [req.id for req in subordinate_reqs] i += 1 print("Waiting for spot instances to be granted...") @@ -584,23 +584,23 @@ def launch_cluster(conn, opts, cluster_name): for i in my_req_ids: if i in id_to_req and id_to_req[i].state == "active": active_instance_ids.append(id_to_req[i].instance_id) - if len(active_instance_ids) == opts.slaves: - print("All %d slaves granted" % opts.slaves) + if len(active_instance_ids) == opts.subordinates: + print("All %d subordinates granted" % opts.subordinates) reservations = conn.get_all_reservations(active_instance_ids) - slave_nodes = [] + subordinate_nodes = [] for r in reservations: - slave_nodes += r.instances + subordinate_nodes += r.instances break else: - print("%d of %d slaves granted, waiting longer" % ( - len(active_instance_ids), opts.slaves)) + print("%d of %d subordinates granted, waiting longer" % ( + len(active_instance_ids), opts.subordinates)) except: print("Canceling spot instance requests") conn.cancel_spot_instance_requests(my_req_ids) # Log a warning if any of these requests actually launched instances: - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - running = len(master_nodes) + len(slave_nodes) + running = len(main_nodes) + len(subordinate_nodes) if running: print(("WARNING: %d instances are still running" % running), file=stderr) sys.exit(0) @@ -609,48 +609,48 @@ def launch_cluster(conn, opts, cluster_name): zones = get_zones(conn, opts) num_zones = len(zones) i = 0 - slave_nodes = [] + subordinate_nodes = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - if num_slaves_this_zone > 0: - slave_res = image.run( + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + if num_subordinates_this_zone > 0: + subordinate_res = image.run( key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, placement=zone, - min_count=num_slaves_this_zone, - max_count=num_slaves_this_zone, + min_count=num_subordinates_this_zone, + max_count=num_subordinates_this_zone, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) - slave_nodes += slave_res.instances - print("Launched {s} slave{plural_s} in {z}, regid = {r}".format( - s=num_slaves_this_zone, - plural_s=('' if num_slaves_this_zone == 1 else 's'), + subordinate_nodes += subordinate_res.instances + print("Launched {s} subordinate{plural_s} in {z}, regid = {r}".format( + s=num_subordinates_this_zone, + plural_s=('' if num_subordinates_this_zone == 1 else 's'), z=zone, - r=slave_res.id)) + r=subordinate_res.id)) i += 1 - # Launch or resume masters - if existing_masters: - print("Starting master...") - for inst in existing_masters: + # Launch or resume mains + if existing_mains: + print("Starting main...") + for inst in existing_mains: if inst.state not in ["shutting-down", "terminated"]: inst.start() - master_nodes = existing_masters + main_nodes = existing_mains else: - master_type = opts.master_instance_type - if master_type == "": - master_type = opts.instance_type + main_type = opts.main_instance_type + if main_type == "": + main_type = opts.instance_type if opts.zone == 'all': opts.zone = random.choice(conn.get_all_zones()).name - master_res = image.run( + main_res = image.run( key_name=opts.key_pair, - security_group_ids=[master_group.id] + additional_group_ids, - instance_type=master_type, + security_group_ids=[main_group.id] + additional_group_ids, + instance_type=main_type, placement=opts.zone, min_count=1, max_count=1, @@ -661,8 +661,8 @@ def launch_cluster(conn, opts, cluster_name): instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) - master_nodes = master_res.instances - print("Launched master in %s, regid = %s" % (zone, master_res.id)) + main_nodes = main_res.instances + print("Launched main in %s, regid = %s" % (zone, main_res.id)) # This wait time corresponds to SPARK-4983 print("Waiting for AWS to propagate instance metadata...") @@ -675,24 +675,24 @@ def launch_cluster(conn, opts, cluster_name): map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',') ) - for master in master_nodes: - master.add_tags( - dict(additional_tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) + for main in main_nodes: + main.add_tags( + dict(additional_tags, Name='{cn}-main-{iid}'.format(cn=cluster_name, iid=main.id)) ) - for slave in slave_nodes: - slave.add_tags( - dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + for subordinate in subordinate_nodes: + subordinate.add_tags( + dict(additional_tags, Name='{cn}-subordinate-{iid}'.format(cn=cluster_name, iid=subordinate.id)) ) # Return all the instances - return (master_nodes, slave_nodes) + return (main_nodes, subordinate_nodes) def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. - Returns a tuple of lists of EC2 instance objects for the masters and slaves. + Returns a tuple of lists of EC2 instance objects for the mains and subordinates. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) @@ -709,82 +709,82 @@ def get_instances(group_names): instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] - master_instances = get_instances([cluster_name + "-master"]) - slave_instances = get_instances([cluster_name + "-slaves"]) + main_instances = get_instances([cluster_name + "-main"]) + subordinate_instances = get_instances([cluster_name + "-subordinates"]) - if any((master_instances, slave_instances)): - print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( - m=len(master_instances), - plural_m=('' if len(master_instances) == 1 else 's'), - s=len(slave_instances), - plural_s=('' if len(slave_instances) == 1 else 's'))) + if any((main_instances, subordinate_instances)): + print("Found {m} main{plural_m}, {s} subordinate{plural_s}.".format( + m=len(main_instances), + plural_m=('' if len(main_instances) == 1 else 's'), + s=len(subordinate_instances), + plural_s=('' if len(subordinate_instances) == 1 else 's'))) - if not master_instances and die_on_error: - print("ERROR: Could not find a master for cluster {c} in region {r}.".format( + if not main_instances and die_on_error: + print("ERROR: Could not find a main for cluster {c} in region {r}.".format( c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) - return (master_instances, slave_instances) + return (main_instances, subordinate_instances) # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. -def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key, copyfiles): - master = get_dns_name(master_nodes[0], False) - local_master = get_dns_name(master_nodes[0], opts.private_ips) +def setup_cluster(conn, main_nodes, subordinate_nodes, opts, deploy_ssh_key, copyfiles): + main = get_dns_name(main_nodes[0], False) + local_main = get_dns_name(main_nodes[0], opts.private_ips) if deploy_ssh_key: - print("Generating cluster's SSH key on master...") + print("Generating cluster's SSH key on main...") key_setup = """ (ssh-keygen -q -t rsa -N '' -f ~/.ssh/tmp_rsa && mv -f ~/.ssh/tmp_rsa ~/.ssh/id_rsa && mv -f ~/.ssh/tmp_rsa.pub ~/.ssh/id_rsa.pub && cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) """ - ssh(master, opts, key_setup) - dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) - print("Transferring cluster's SSH key to slaves...") - for slave in slave_nodes: -# slave_address = get_dns_name(slave, opts.private_ips) - slave_address = get_dns_name(slave, False) - print(slave_address) - ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) - - print("configuring master %s" % master); - - slave_names = [get_dns_name(i, False) for i in slave_nodes]; - local_slave_names = [get_dns_name(i, opts.private_ips) for i in slave_nodes]; - slaves_string = reduce(lambda x,y : x + "\n" + y, local_slave_names); + ssh(main, opts, key_setup) + dot_ssh_tar = ssh_read(main, opts, ['tar', 'c', '.ssh']) + print("Transferring cluster's SSH key to subordinates...") + for subordinate in subordinate_nodes: +# subordinate_address = get_dns_name(subordinate, opts.private_ips) + subordinate_address = get_dns_name(subordinate, False) + print(subordinate_address) + ssh_write(subordinate_address, opts, ['tar', 'x'], dot_ssh_tar) + + print("configuring main %s" % main); + + subordinate_names = [get_dns_name(i, False) for i in subordinate_nodes]; + local_subordinate_names = [get_dns_name(i, opts.private_ips) for i in subordinate_nodes]; + subordinates_string = reduce(lambda x,y : x + "\n" + y, local_subordinate_names); - bscommand="mkdir -p %s/conf; echo -e '%s' > %s/conf/slaves" % (bidmach_install_dir, slaves_string, bidmach_install_dir) - ssh(master, opts, bscommand.encode('ascii','ignore')) + bscommand="mkdir -p %s/conf; echo -e '%s' > %s/conf/subordinates" % (bidmach_install_dir, subordinates_string, bidmach_install_dir) + ssh(main, opts, bscommand.encode('ascii','ignore')) - bmcommand="echo -e '%s' > %s/conf/master" % (local_master, bidmach_install_dir) - ssh(master, opts, bmcommand.encode('ascii','ignore')) + bmcommand="echo -e '%s' > %s/conf/main" % (local_main, bidmach_install_dir) + ssh(main, opts, bmcommand.encode('ascii','ignore')) - hscommand="echo -e '%s' > %s/etc/hadoop/slaves" % (slaves_string, hadoop_install_dir) - ssh(master, opts, hscommand.encode('ascii','ignore')) + hscommand="echo -e '%s' > %s/etc/hadoop/subordinates" % (subordinates_string, hadoop_install_dir) + ssh(main, opts, hscommand.encode('ascii','ignore')) - core_conf = core_site % local_master + core_conf = core_site % local_main hccommand="echo '%s' > %s/etc/hadoop/core-site.xml" % (core_conf, hadoop_install_dir) - ssh(master, opts, hccommand.encode('ascii','ignore')) - - akka_master_conf = akka_conf % (local_master, local_master, local_master) - amcommand="echo '%s' > %s/conf/application.conf" % (akka_master_conf, bidmach_install_dir) - ssh(master, opts,amcommand.encode('ascii','ignore')) - - ssh(master, opts, """rm -f ~/.ssh/known_hosts""") - for i in range(len(slave_names)): - slave = slave_names[i]; - local_slave = local_slave_names[i]; - print("configuring slave %s" % slave) - ssh(slave, opts, """rm -f ~/.ssh/known_hosts""") - ssh(slave, opts, hccommand.encode('ascii','ignore')) - ssh(slave, opts, hscommand.encode('ascii','ignore')) - ssh(slave, opts, bscommand.encode('ascii','ignore')) - ssh(slave, opts, bmcommand.encode('ascii','ignore')) - akka_slave_conf = akka_conf % (local_slave, local_master, local_master) - ascommand="echo '%s' > %s/conf/application.conf" % (akka_slave_conf, bidmach_install_dir) - ssh(slave, opts, ascommand.encode('ascii','ignore')) + ssh(main, opts, hccommand.encode('ascii','ignore')) + + akka_main_conf = akka_conf % (local_main, local_main, local_main) + amcommand="echo '%s' > %s/conf/application.conf" % (akka_main_conf, bidmach_install_dir) + ssh(main, opts,amcommand.encode('ascii','ignore')) + + ssh(main, opts, """rm -f ~/.ssh/known_hosts""") + for i in range(len(subordinate_names)): + subordinate = subordinate_names[i]; + local_subordinate = local_subordinate_names[i]; + print("configuring subordinate %s" % subordinate) + ssh(subordinate, opts, """rm -f ~/.ssh/known_hosts""") + ssh(subordinate, opts, hccommand.encode('ascii','ignore')) + ssh(subordinate, opts, hscommand.encode('ascii','ignore')) + ssh(subordinate, opts, bscommand.encode('ascii','ignore')) + ssh(subordinate, opts, bmcommand.encode('ascii','ignore')) + akka_subordinate_conf = akka_conf % (local_subordinate, local_main, local_main) + ascommand="echo '%s' > %s/conf/application.conf" % (akka_subordinate_conf, bidmach_install_dir) + ssh(subordinate, opts, ascommand.encode('ascii','ignore')) print("Done!") @@ -1049,10 +1049,10 @@ def get_zones(conn, opts): # Gets the number of items in a partition def get_partition(total, num_partitions, current_partitions): - num_slaves_this_zone = total // num_partitions + num_subordinates_this_zone = total // num_partitions if (total % num_partitions) - current_partitions > 0: - num_slaves_this_zone += 1 - return num_slaves_this_zone + num_subordinates_this_zone += 1 + return num_subordinates_this_zone # Gets the IP address, taking into account the --private-ips flag @@ -1091,21 +1091,21 @@ def real_main(): print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format( t=opts.instance_type), file=stderr) - if opts.master_instance_type != "": - if opts.master_instance_type not in EC2_INSTANCE_TYPES: - print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( - t=opts.master_instance_type), file=stderr) + if opts.main_instance_type != "": + if opts.main_instance_type not in EC2_INSTANCE_TYPES: + print("Warning: Unrecognized EC2 instance type for main-instance-type: {t}".format( + t=opts.main_instance_type), file=stderr) # Since we try instance types even if we can't resolve them, we check if they resolve first # and, if they do, see if they resolve to the same virtualization type. if opts.instance_type in EC2_INSTANCE_TYPES and \ - opts.master_instance_type in EC2_INSTANCE_TYPES: + opts.main_instance_type in EC2_INSTANCE_TYPES: if EC2_INSTANCE_TYPES[opts.instance_type] != \ - EC2_INSTANCE_TYPES[opts.master_instance_type]: - print("Error: bidmach-ec2 currently does not support having a master and slaves " + EC2_INSTANCE_TYPES[opts.main_instance_type]: + print("Error: bidmach-ec2 currently does not support having a main and subordinates " "with different AMI virtualization types.", file=stderr) - print("master instance virtualization type: {t}".format( - t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr) - print("slave instance virtualization type: {t}".format( + print("main instance virtualization type: {t}".format( + t=EC2_INSTANCE_TYPES[opts.main_instance_type]), file=stderr) + print("subordinate instance virtualization type: {t}".format( t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr) sys.exit(1) @@ -1132,48 +1132,48 @@ def real_main(): opts.zone = random.choice(conn.get_all_zones()).name if action == "launch": - if opts.slaves <= 0: - print("ERROR: You have to start at least 1 slave", file=sys.stderr) + if opts.subordinates <= 0: + print("ERROR: You have to start at least 1 subordinate", file=sys.stderr) sys.exit(1) if opts.resume: - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) else: - (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = launch_cluster(conn, opts, cluster_name) wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='ssh-ready' ) - setup_cluster(conn, master_nodes, slave_nodes, opts, True, True) + setup_cluster(conn, main_nodes, subordinate_nodes, opts, True, True) elif action == "destroy": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - if any(master_nodes + slave_nodes): + if any(main_nodes + subordinate_nodes): print("The following instances will be terminated:") - for inst in master_nodes + slave_nodes: + for inst in main_nodes + subordinate_nodes: print("> %s" % get_dns_name(inst, opts.private_ips)) print("ALL DATA ON ALL NODES WILL BE LOST!!") msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) response = raw_input(msg) if response == "y": - print("Terminating master...") - for inst in master_nodes: + print("Terminating main...") + for inst in main_nodes: inst.terminate() - print("Terminating slaves...") - for inst in slave_nodes: + print("Terminating subordinates...") + for inst in subordinate_nodes: inst.terminate() # Delete security groups as well if opts.delete_groups: - group_names = [cluster_name + "-master", cluster_name + "-slaves"] + group_names = [cluster_name + "-main", cluster_name + "-subordinates"] wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='terminated' ) print("Deleting security groups (this will take some time)...") @@ -1217,58 +1217,58 @@ def real_main(): print("Try re-running in a few minutes.") elif action == "login": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) if opts.node is not None: - slave = slave_nodes[opts.node] - if not slave.public_dns_name and not opts.private_ips: - print("Slave has no public DNS name. Maybe you meant to specify --private-ips?") + subordinate = subordinate_nodes[opts.node] + if not subordinate.public_dns_name and not opts.private_ips: + print("Subordinate has no public DNS name. Maybe you meant to specify --private-ips?") else: - slave = get_dns_name(slave, opts.private_ips) - print("Logging into slave " + slave + "...") + subordinate = get_dns_name(subordinate, opts.private_ips) + print("Logging into subordinate " + subordinate + "...") proxy_opt = [] if opts.proxy_port is not None: proxy_opt = ['-D', opts.proxy_port] subprocess.check_call( - ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, slave)]) + ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, subordinate)]) else: - if not master_nodes[0].public_dns_name and not opts.private_ips: - print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + if not main_nodes[0].public_dns_name and not opts.private_ips: + print("Main has no public DNS name. Maybe you meant to specify --private-ips?") else: - master = get_dns_name(master_nodes[0], opts.private_ips) - print("Logging into master " + master + "...") + main = get_dns_name(main_nodes[0], opts.private_ips) + print("Logging into main " + main + "...") proxy_opt = [] if opts.proxy_port is not None: proxy_opt = ['-D', opts.proxy_port] subprocess.check_call( - ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) + ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, main)]) - elif action == "reboot-slaves": + elif action == "reboot-subordinates": response = raw_input( "Are you sure you want to reboot the cluster " + - cluster_name + " slaves?\n" + - "Reboot cluster slaves " + cluster_name + " (y/N): ") + cluster_name + " subordinates?\n" + + "Reboot cluster subordinates " + cluster_name + " (y/N): ") if response == "y": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print("Rebooting slaves...") - for inst in slave_nodes: + print("Rebooting subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: print("Rebooting " + inst.id) inst.reboot() - elif action == "get-master": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - if not master_nodes[0].public_dns_name and not opts.private_ips: - print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + elif action == "get-main": + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + if not main_nodes[0].public_dns_name and not opts.private_ips: + print("Main has no public DNS name. Maybe you meant to specify --private-ips?") else: - print(get_dns_name(master_nodes[0], opts.private_ips)) + print(get_dns_name(main_nodes[0], opts.private_ips)) - elif action == "get-slaves": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - for inst in slave_nodes: + elif action == "get-subordinates": + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + for inst in subordinate_nodes: if not inst.public_dns_name and not opts.private_ips: - print("Slave %s has no public DNS name. Maybe you meant to specify --private-ips?" % inst) + print("Subordinate %s has no public DNS name. Maybe you meant to specify --private-ips?" % inst) else: print(get_dns_name(inst, opts.private_ips)) @@ -1278,17 +1278,17 @@ def real_main(): cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + - "All data on spot-instance slaves will be lost.\n" + + "All data on spot-instance subordinates will be lost.\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print("Stopping master...") - for inst in master_nodes: + print("Stopping main...") + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.stop() - print("Stopping slaves...") - for inst in slave_nodes: + print("Stopping subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate() @@ -1296,33 +1296,33 @@ def real_main(): inst.stop() elif action == "start": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print("Starting slaves...") - for inst in slave_nodes: + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + print("Starting subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - print("Starting master...") - for inst in master_nodes: + print("Starting main...") + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='ssh-ready' ) # Determine types of running instances - existing_master_type = master_nodes[0].instance_type - existing_slave_type = slave_nodes[0].instance_type - # Setting opts.master_instance_type to the empty string indicates we - # have the same instance type for the master and the slaves - if existing_master_type == existing_slave_type: - existing_master_type = "" - opts.master_instance_type = existing_master_type - opts.instance_type = existing_slave_type - - setup_cluster(conn, master_nodes, slave_nodes, opts, False, False) + existing_main_type = main_nodes[0].instance_type + existing_subordinate_type = subordinate_nodes[0].instance_type + # Setting opts.main_instance_type to the empty string indicates we + # have the same instance type for the main and the subordinates + if existing_main_type == existing_subordinate_type: + existing_main_type = "" + opts.main_instance_type = existing_main_type + opts.instance_type = existing_subordinate_type + + setup_cluster(conn, main_nodes, subordinate_nodes, opts, False, False) else: print("Invalid action: %s" % action, file=stderr) diff --git a/scripts/cluster_mux.py b/scripts/cluster_mux.py index 6d663129..e342f726 100755 --- a/scripts/cluster_mux.py +++ b/scripts/cluster_mux.py @@ -2,7 +2,7 @@ import subprocess import time -SPARK_SLAVE_PATH = '/opt/spark/conf/slaves' +SPARK_SLAVE_PATH = '/opt/spark/conf/subordinates' def tmux_cmd(cmd, fail_ok=False): @@ -31,9 +31,9 @@ def main(): pane_ids = tmux_cmd('list-panes -t tail-workers -F #D') with open(SPARK_SLAVE_PATH, 'r') as f: - slave_addrs = list(f.readlines()) + subordinate_addrs = list(f.readlines()) - for pid, saddr in zip(pane_ids, slave_addrs): + for pid, saddr in zip(pane_ids, subordinate_addrs): send_cmd(pid, 'su2') time.sleep(0.05) send_cmd(pid, 'ssh {}'.format(saddr)) diff --git a/scripts/collect_files.py b/scripts/collect_files.py index 3f9ec8fc..ffd78c78 100755 --- a/scripts/collect_files.py +++ b/scripts/collect_files.py @@ -7,17 +7,17 @@ def main(): files = sys.argv[1:] - s = subprocess.check_output("python bidmach_ec2.py -k id_rsa -i ~/.ssh/id_rsa --region=us-west-2 get-slaves " + os.environ['CLUSTER'], shell=True) - slaves = s.splitlines()[2:] + s = subprocess.check_output("python bidmach_ec2.py -k id_rsa -i ~/.ssh/id_rsa --region=us-west-2 get-subordinates " + os.environ['CLUSTER'], shell=True) + subordinates = s.splitlines()[2:] dir = '/code/BIDMach/%s/%s' % (os.environ['CLUSTER'], datetime.datetime.now().strftime("%Y%m%d%H%M")) os.mkdir(dir) - for s in slaves: - slave_dir = '%s/%s' % (dir, s) - os.mkdir(slave_dir) - todostr = 'rsync -e "ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no" -avz ubuntu@%s:/code/BIDMach/logs/log.0.0.txt %s/' % (s, slave_dir) + for s in subordinates: + subordinate_dir = '%s/%s' % (dir, s) + os.mkdir(subordinate_dir) + todostr = 'rsync -e "ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no" -avz ubuntu@%s:/code/BIDMach/logs/log.0.0.txt %s/' % (s, subordinate_dir) print(todostr) subprocess.check_call(todostr, shell=True) - todostr = 'rsync -e "ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no" -avz ubuntu@%s:/code/BIDMach/scripts/logres* %s/' % (s, slave_dir) + todostr = 'rsync -e "ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no" -avz ubuntu@%s:/code/BIDMach/scripts/logres* %s/' % (s, subordinate_dir) print(todostr) subprocess.check_call(todostr, shell=True)