|
| 1 | +#!/usr/bin/env python |
| 2 | +from gevent import monkey |
| 3 | +monkey.patch_all() |
| 4 | + |
| 5 | +import click |
| 6 | +import json |
| 7 | +import operator |
| 8 | +import random |
| 9 | +import time |
| 10 | + |
| 11 | +from munkres import Munkres, make_cost_matrix |
| 12 | +from nylas.logging import get_logger, configure_logging |
| 13 | +configure_logging() |
| 14 | +log = get_logger() |
| 15 | + |
| 16 | +from inbox.config import config |
| 17 | +from inbox.scheduling.deferred_migration import (DeferredAccountMigration, |
| 18 | + DeferredAccountMigrationExecutor) |
| 19 | +from inbox.models.session import global_session_scope |
| 20 | +from inbox.models.account import Account |
| 21 | +from inbox.util import fleet |
| 22 | +from inbox.models.session import session_scope |
| 23 | + |
| 24 | +# How long we should take to migrate all accounts (in seconds). |
| 25 | +ACCOUNT_MIGRATION_TIMESPAN = 15 * 60 # 15 minutes |
| 26 | + |
| 27 | + |
| 28 | +def actual_hostname(hostname): |
| 29 | + # A little hack for running the rebalance script locally in a dev VM |
| 30 | + if hostname == 'localhost': |
| 31 | + return 'precise64' |
| 32 | + return hostname |
| 33 | + |
| 34 | + |
| 35 | +def jitter_for_deadline(timespan): |
| 36 | + min_delay = 10 |
| 37 | + max_delay = timespan |
| 38 | + return (random.random() * (max_delay - min_delay)) + min_delay |
| 39 | + |
| 40 | + |
| 41 | +def is_account_on_debug_host(account_id, debug_hosts): |
| 42 | + with session_scope(account_id) as db_session: |
| 43 | + sync_host = db_session.query(Account.sync_host).get(account_id) |
| 44 | + return sync_host in debug_hosts |
| 45 | + |
| 46 | + |
| 47 | +def partition_accounts(load_per_account, num_buckets): |
| 48 | + # Partition equitably in n-buckets. |
| 49 | + # http://stackoverflow.com/a/6670011 |
| 50 | + sorted_loads = sorted(load_per_account.items(), key=operator.itemgetter(1), reverse=True) |
| 51 | + buckets = [[] for i in range(num_buckets)] |
| 52 | + bucket_totals = [0.0 for i in range(num_buckets)] |
| 53 | + |
| 54 | + i = 0 |
| 55 | + for account_id, load in sorted_loads[0:num_buckets]: |
| 56 | + buckets[i].append(account_id) |
| 57 | + bucket_totals[i] += load |
| 58 | + i += 1 |
| 59 | + |
| 60 | + for account, load in sorted_loads[num_buckets:]: |
| 61 | + # Find the less loaded bucket: |
| 62 | + i = bucket_totals.index(min(bucket_totals)) |
| 63 | + buckets[i].append(account) |
| 64 | + bucket_totals[i] += load |
| 65 | + return buckets |
| 66 | + |
| 67 | + |
| 68 | +def get_account_hosts(): |
| 69 | + with global_session_scope() as db_session: |
| 70 | + return dict((str(id_), host) for id_, host in |
| 71 | + db_session.query(Account.id, Account.sync_host). |
| 72 | + filter(Account.sync_should_run)) |
| 73 | + |
| 74 | + |
| 75 | +def do_minimize_migrations(hosts, buckets, should_optimize=True): |
| 76 | + # Our task is to find a bipartite matching between buckets and hosts that |
| 77 | + # maximizes the number of Accounts that are already assigned to the correct |
| 78 | + # sync host. To do this we use the Hungarian algorithm which computes a |
| 79 | + # bipartite matching between n workers and n tasks such that the overall |
| 80 | + # cost is minimized (see https://en.wikipedia.org/wiki/Hungarian_algorithm). |
| 81 | + # Luckily there's a python library (munkres) that implements this algorithm |
| 82 | + # for us :-) Since this algorithm minimizes cost we must first build our |
| 83 | + # profit matrix and then convert it into a cost matrix. |
| 84 | + account_hosts = get_account_hosts() |
| 85 | + profit_matrix = [] |
| 86 | + max_num_present = 0 |
| 87 | + sync_procs = [] |
| 88 | + for host in hosts: |
| 89 | + for i in range(host['num_procs']): |
| 90 | + sync_procs.append('{}:{}'.format(actual_hostname(host['name']), i)) |
| 91 | + |
| 92 | + # Construct the profit matrix. Each row corresponds to a bucket and each |
| 93 | + # column within that row corresponds to the number of items in that bucket |
| 94 | + # that are currently assigned to the corresponding sync host. |
| 95 | + for bucket in buckets: |
| 96 | + row = [] |
| 97 | + for proc_id in sync_procs: |
| 98 | + num_present = 0 |
| 99 | + for account_id in bucket: |
| 100 | + if account_hosts.get(account_id) == proc_id: |
| 101 | + num_present += 1 |
| 102 | + # We add 1 because the munkres library can't really handle matrices |
| 103 | + # with 0 values :-/ This won't change the ultimate answer, however. |
| 104 | + num_present += 1 |
| 105 | + row.append(num_present) |
| 106 | + max_num_present = max(num_present, max_num_present) |
| 107 | + profit_matrix.append(row) |
| 108 | + |
| 109 | + indexes = None |
| 110 | + if should_optimize: |
| 111 | + # We add 1 because the munkres library can't really handle matrices |
| 112 | + # with 0 values :-/ This won't change the ultimate answer, however. |
| 113 | + max_num_present += 1 |
| 114 | + cost_matrix = make_cost_matrix(profit_matrix, lambda cost: max_num_present - cost) |
| 115 | + |
| 116 | + m = Munkres() |
| 117 | + indexes = m.compute(cost_matrix) |
| 118 | + else: |
| 119 | + indexes = [(i, i) for i in range(len(sync_procs))] |
| 120 | + |
| 121 | + # Now that we have the optimal solution we need to reorder the original |
| 122 | + # buckets to match to their corresponding hosts based on the results. |
| 123 | + result_buckets = [None for _ in indexes] |
| 124 | + total_profit = 0 |
| 125 | + total_accounts = 0 |
| 126 | + for row, column in indexes: |
| 127 | + total_profit += profit_matrix[row][column] - 1 |
| 128 | + result_buckets[column] = buckets[row] |
| 129 | + total_accounts += len(buckets[row]) |
| 130 | + log.info("Accounts already on the correct hosts:", |
| 131 | + correct_accounts=total_profit, |
| 132 | + total_accounts=total_accounts, |
| 133 | + correct_percent=float(total_profit) / float(total_accounts) * 100.0) |
| 134 | + return result_buckets |
| 135 | + |
| 136 | + |
| 137 | +def migrate_accounts(zone, hosts, buckets, timespan): |
| 138 | + start_time = time.time() |
| 139 | + executor = DeferredAccountMigrationExecutor() # Just for its Redis thingy |
| 140 | + |
| 141 | + bucket_idx = 0 |
| 142 | + for host_idx, host in enumerate(hosts): |
| 143 | + host['name'] = actual_hostname(host['name']) |
| 144 | + |
| 145 | + for process_idx in range(host['num_procs']): |
| 146 | + instance = '{}:{}'.format(host['name'], process_idx) |
| 147 | + bucket = buckets[bucket_idx] |
| 148 | + bucket_idx += 1 |
| 149 | + |
| 150 | + for account_id in bucket: |
| 151 | + delay = jitter_for_deadline(timespan) |
| 152 | + deadline = start_time + delay |
| 153 | + log.info("Sync load balancer migrating Account", |
| 154 | + zone=zone, |
| 155 | + account_id=account_id, |
| 156 | + host=instance, |
| 157 | + delay=delay) |
| 158 | + dam = DeferredAccountMigration(deadline, account_id, instance) |
| 159 | + dam.save(executor.redis) |
| 160 | + |
| 161 | + |
| 162 | +def balance_zone(zone, normal_hosts, debug_hosts, account_loads, timespan, minimize_migrations, dry_run): |
| 163 | + num_buckets = sum([host['num_procs'] for host in normal_hosts]) |
| 164 | + account_loads = {account_id: load for account_id, load in account_loads.items() |
| 165 | + if not is_account_on_debug_host(account_id, debug_hosts)} |
| 166 | + buckets = partition_accounts(account_loads, num_buckets) |
| 167 | + buckets = do_minimize_migrations(normal_hosts, buckets, minimize_migrations) |
| 168 | + if dry_run: |
| 169 | + print "Would reassign accounts in zone {} like this:".format(zone) |
| 170 | + for bucket in buckets: |
| 171 | + bucket_load = 0 |
| 172 | + for account_id in bucket: |
| 173 | + bucket_load += account_loads[account_id] |
| 174 | + print "\t{}: {}".format(bucket_load, bucket) |
| 175 | + return |
| 176 | + migrate_accounts(zone, normal_hosts, buckets, timespan) |
| 177 | + |
| 178 | + |
| 179 | +@click.command() |
| 180 | +@click.option('--level', default='staging') |
| 181 | +@click.option('--dry-run', is_flag=True, default=False) |
| 182 | +@click.option('--timespan', default=ACCOUNT_MIGRATION_TIMESPAN) |
| 183 | +@click.option('--minimize-migrations/--no-minimize-migrations', default=True) |
| 184 | +@click.argument('account-loads') |
| 185 | +def main(dry_run, level, timespan, minimize_migrations, account_loads): |
| 186 | + zones = {h.get('ZONE') for h in config['DATABASE_HOSTS']} |
| 187 | + load_per_account = {} |
| 188 | + with open(account_loads) as f: |
| 189 | + load_per_account = json.load(f) |
| 190 | + for zone in zones: |
| 191 | + loads = load_per_account.get(zone) |
| 192 | + if loads is None: |
| 193 | + loads = load_per_account['null'] |
| 194 | + hosts = fleet.get_sync_hosts_in_zone(zone, level) |
| 195 | + normal_hosts = [h for h in hosts if not h['debug']] |
| 196 | + debug_hosts = set(h for h in hosts if h['debug']) |
| 197 | + balance_zone(zone, normal_hosts, debug_hosts, loads, timespan, minimize_migrations, dry_run) |
| 198 | + |
| 199 | + |
| 200 | +if __name__ == '__main__': |
| 201 | + main() |
0 commit comments