#!/usr/bin/env python # -*- coding: utf-8 -*- # Author : Yohan Bataille (ING) # redis-manager from __future__ import print_function version = "4" # Requires: # redis-cli # python 2 (2.6 and 2.7 tested) import sys import os import json import time import copy from subprocess import PIPE, Popen import argparse import SocketServer import socket from BaseHTTPServer import BaseHTTPRequestHandler import threading import traceback import pprint import urllib2 from random import randint from threading import Lock xprint_lock = Lock() def xprint(*args, **kwargs): """Thread safe print function""" with xprint_lock: print(*args, **kwargs) sys.stdout.flush() xprint("redis manager version: " + version) # positional : REDIS_PATH, REDIS_NODES, ENV, HTTP_PORT (in this order, all mandatory, ENV : "DEV" or "PROD") # optional : time (-t), dry_run (-n) parser = argparse.ArgumentParser() parser.add_argument("REDIS_PATH", help="Path to redis-cli binary. Example: '/opt/redis/bin/redis-cli'") parser.add_argument("REDIS_NODES", help="Please specify a list of nodes as a single string, grouping nodes by datacenter and using '/' to separate datacenters if there is more than one. Example: ',, /,,'.") parser.add_argument("ENV", choices=['DEV', 'PROD'], help="Please specify 'DEV' or 'PROD'.") parser.add_argument("HTTP_PORT", help="Listen to request on this HTTP port.") parser.add_argument("-m", "--other_managers", help="List of other managers as a single string. Example: 'slzuss3vmq00.sres.stech:8080, slzuss3vmq01.sres.stech:8080'.") parser.add_argument("-t", "--time", help="Time to wait between checks, in seconds. Default: 3") parser.add_argument("-w", "--failover_max_wait", help="How long to wait for Redis cluster to obey a failover command, in seconds. Default: 30") parser.add_argument("-u", "--unresponsive_timeout", help="How long to wait before assuming the manager main loop is stuck, in seconds. Default: 30") parser.add_argument("-n", "--dry_run", help="Do not send modifications commands to the cluster.", action="store_true") args = parser.parse_args() # if environment >= NEHOM ==> all_checks needs to be True all_checks = True if args.ENV == "DEV": all_checks = False other_managers = list() if args.other_managers: # Clean the string other_managers = "".join(args.other_managers.split()) other_managers = other_managers.split(',') loop_time = 3 if args.time: loop_time = int(args.time) failover_max_wait = 30 if args.failover_max_wait: failover_max_wait = int(args.failover_max_wait) unresponsive_timeout = 30 if args.unresponsive_timeout: unresponsive_timeout = int(args.unresponsive_timeout) test = False if args.dry_run: test = True redis_cli = args.REDIS_PATH http_port = int(args.HTTP_PORT) xprint("HTTP_PORT: " + str(http_port)) xprint("REDIS_NODES: " + args.REDIS_NODES) xprint("all_checks: " + str(all_checks)) xprint("loop_time: " + str(loop_time)) xprint("failover_max_wait: " + str(failover_max_wait)) xprint("other_managers: " + str(other_managers)) xprint("redis_cli: " + redis_cli) xprint("test: " + str(test)) #[root@slzuss3vmq00 ~]# /opt/ZUTA0/Logiciel/RDI/bin/redis-cli -h -p 7002 --raw CLUSTER INFO #cluster_state:fail #[root@slzuss3vmq00 ~]# /opt/ZUTA0/Logiciel/RDI/bin/redis-cli -h -p 7002 --raw CLUSTER NODES #bd6aef93f187bab16d12236cce2faf0ac40ad727 master,fail? - 1496355476275 1496355472467 11 disconnected 5461-10922 #7d134a073e5be16f2d2a73e27cc63b062e74c91f master,fail? - 1496355476275 1496355475475 9 disconnected 10923-16383 #af5f4f889bf6ee6573ec57625216e7d4416e37ae slave bd6aef93f187bab16d12236cce2faf0ac40ad727 0 1496357907477 11 connected #0585d0b297d3a15013472ab83187d5b0422a4f23 myself,slave 7d134a073e5be16f2d2a73e27cc63b062e74c91f 0 0 8 connected #bea1cec63063542b0cc003b58e198b68c7993545 slave e44f67740aa2be4a568587d8ac7d4914614934fb 0 1496357910486 10 connected #e44f67740aa2be4a568587d8ac7d4914614934fb master - 0 1496357909482 10 connected 0-5460 #[root@slzuss3vmq00 ~]# /opt/ZUTA0/Logiciel/RDI/bin/redis-cli -h -p 7002 --raw INFO ## Replication #role:slave #master_host: #master_port:7005 #master_link_status:down #[root@slzuss3vmq00 ~]# /opt/ZUTA0/Logiciel/RDI/bin/redis-cli -h -p 7003 --raw INFO ## Replication #role:master #connected_slaves:1 #slave0:ip=,port=7000,state=online,offset=3809,lag=1 def api_help(): return """ ### HTTP API description: # /cluster_status: 'manager disabled' # 'Unknown state' # 'KO' # 'Unknown cluster' # 'At risk' # 'Imbalanced' # 'OK' # /manager_status: 'active' # 'active asleep' # 'active asleep repartition disabled' # 'active repartition disabled' # 'passive' # 'passive asleep' # 'passive asleep repartition disabled' # 'passive repartition disabled' # 'failed' # 'starting' # /help : This message # /debug/enable # /debug/disable # /sleep?seconds= # /prepare_for_reboot/&duration=: # 'SIMILAR REQUEST ALREADY IN PROGRESS' # 'WAIT' # 'DONE' # 'CANNOT PROCEED: passive manager' # example : /prepare_for_reboot/ # /version : redis manager version """ # TODO: Remove global variables. def main(): startup_nodes_list, startup_nodes_by_datacenter, startup_nodes_by_server = cluster_startup_topo() global failover_without_quorum_requested failover_without_quorum_requested = list() global failover_with_quorum_requested failover_with_quorum_requested = list() global plan plan = dict() global cluster_state global manager_status global request_active global sleep_until global no_repartition_until global no_repartition_duration global slave_only_engine global raw_topo global last_loop_epoch current_cluster_topo = list() if all_checks: xprint("PROD mode enabled: master imbalance correction by server and datacenter activated.") else: xprint("DEV mode enabled: master imbalance correction by server and datacenter deactivated.") while True: last_loop_epoch = time.mktime(time.gmtime()) num_manager_active = 0 sleep = False for manager in other_managers: try: r = urllib2.urlopen('http://' + manager + '/manager_status', None, 1) l = r.readlines() if len(l) == 1 and (l[0].split()[0] == 'passive' or l[0].split()[0] == 'starting'): if request_active: r2 = urllib2.urlopen('http://' + manager + '/request_active', None, 1) l2 = r2.readlines() if len(l2) == 1 and l2[0] == 'yes': xprint("other has requested activation") request_active = False sleep = True elif len(l) == 1 and l[0].split()[0] == 'active': if num_manager_active == 0: num_manager_active += 1 else: xprint("Too many active managers!") elif len(l) == 1 and l[0].split()[0] == 'failed': xprint("manager " + manager + " is KO.") else: xprint("manager " + manager + " has answered with garbage: " + str(l)) except: xprint("manager " + manager + " is not responding.") if num_manager_active == 0 and (manager_status == 'passive' or manager_status == 'starting'): if request_active: xprint("Becoming active!") manager_status = 'active' request_active = False elif sleep: xprint("Sleeping to let another manager activate.") time.sleep(randint(1, 10)) continue else: request_active = True xprint("Manager election in progress.") time.sleep(randint(1, 10)) continue elif num_manager_active == 1 and manager_status == 'starting': xprint("Manager election finished, we are passive!") manager_status = 'passive' elif num_manager_active >= 1 and manager_status == 'active': xprint("Becoming passive!") manager_status = 'passive' if sleep_until != 0: delta = sleep_until - time.mktime(time.gmtime()) if delta <= 0: sleep_until = 0 else: xprint("Sleeping as requested. " + str(delta) + " seconds remaining.") time.sleep(loop_time) continue raw_topo, raw_topo_str = cluster_topo(startup_nodes_list, startup_nodes_by_datacenter) if raw_topo is None: cluster_state = 'Unknown state' xprint('Critical failure: cannot get cluster topology. Doing nothing.') time.sleep(loop_time) continue bool_cluster_online = cluster_online(startup_nodes_list) if bool_cluster_online: pass else: cluster_state = 'KO' xprint('Cluster failure.') # print(raw_topo_str) if not same_cluster(startup_nodes_list, raw_topo): pprint.pprint(raw_topo, width=300) cluster_state = 'Unknown cluster' xprint('Not the exact cluster we know. Doing nothing.') time.sleep(loop_time) continue for node in raw_topo: if node_status(node) == 'cluster still assessing': xprint('Something is going on but Redis Cluster is still assessing the situation. Doing nothing.') pprint.pprint(raw_topo, width=300) time.sleep(loop_time) continue # Loop over nodes, detect slaves with offline master and promote one slave (keep track of treated failed masters) if not bool_cluster_online: pprint.pprint(raw_topo, width=300) if has_quorum(raw_topo): xprint("Cluster has quorum and can recover by itself. Doing nothing.") else: failed_masters = list() new_failover_without_quorum_requested = list() for node in raw_topo: if node_role(node) == 'slave' and node_status(node) == 'ok': if master_status_of_slave(raw_topo, node) != 'ok': masterid = masterid_of_slave(node) if masterid in failed_masters: xprint("Slave " + node_name(node) + " does not see master " + node_name_from_id(raw_topo, masterid) + ", but a slave has already been promoted. Doing nothing.") elif manager_status == 'active': if failover_without_quorum_did_not_happen(raw_topo)["one_did_not_happen"]: if not failover_without_quorum_did_not_happen(raw_topo)["one_cleared"]: xprint("Cluster did not comply with our previous failover request. Waiting.") else: xprint("Failed master: " + node_name_from_id(raw_topo, masterid) + ". Promoting slave " + node_name(node) + ".") failover_without_quorum(node_ip(node), node_port(node)) new_failover_without_quorum_requested.append({'slave': node_object_from_node_name(node_name(node)), 'epoch': time.mktime(time.gmtime())}) failed_masters.append(masterid) failover_without_quorum_requested = failover_without_quorum_requested + new_failover_without_quorum_requested if failed_masters == list(): xprint("Critical failure : no slave remaining, cannot do anything.") else: # Detect risky situations failure = False for node in raw_topo: if node_role(node) == 'master' and node_status(node) == 'ok': # Detect master without slave if not master_has_at_least_one_slave(raw_topo, node): xprint("Master " + node_name(node) + " has no slave !") failure = True elif node_role(node) == 'slave' and all_checks: # Detect slave on same server as master. if node_ip(node) == node_ip_from_id(raw_topo, masterid_of_slave(node)): xprint("Slave " + node_name(node) + " is on the same server as master " + node_name_from_id(raw_topo, masterid_of_slave(node))) failure = True # Detect slave on same datacenter as master. if node_datacenter(node) == node_datacenter_from_id(raw_topo, masterid_of_slave(node)): xprint("Slave " + node_name(node) + " is on the same datacenter as master " + node_name_from_id(raw_topo, masterid_of_slave(node))) failure = True if failure: cluster_state = 'At risk' time.sleep(loop_time) continue if current_cluster_topo == list(): pprint.pprint(raw_topo, width=300) current_cluster_topo = raw_topo elif cluster_has_changed(current_cluster_topo, raw_topo): xprint("Cluster topology has changed") pprint.pprint(raw_topo, width=300) current_cluster_topo = raw_topo if plan != dict() and manager_status == 'active': #pprint.pprint(plan, width=300) steps = [int(key) for key in plan.keys()] steps.remove(0) current_step = min(steps) if not cluster_has_changed(current_cluster_topo, plan['0']['starting_topo']): if failover_with_quorum_did_not_happen(raw_topo)["one_cleared"]: xprint("Cluster did not comply with our previous failover request. We reached the timeout. Plan Failed. Forget it.") plan = dict() else: xprint("Still waiting for the cluster to proceed with the failover.") elif cluster_has_changed(current_cluster_topo, plan[str(current_step)]['target_topo']): xprint("Cluster topology is not what we would expect. Something happened. Plan failed. Forget it.") plan = dict() else: if len(steps) > 1: xprint("Step " + str(current_step) + " succeeded.") del plan[str(current_step)] xprint("Launching step " + str(current_step + 1) + ".") slave = plan[str(current_step + 1)]['slave'] master = plan[str(current_step + 1)]['master'] xprint("Slave " + slave + " will replace his master " + master) node_object = node_object_from_node_name(slave) failover_with_quorum(node_object['host'], node_object['port']) failover_with_quorum_requested.append({'slave': node_object, 'epoch': time.mktime(time.gmtime())}) else: xprint("Final step succeeded. The cluster is now balanced.") xprint("I love it when a plan comes together!") plan = dict() time.sleep(loop_time) continue if slave_only_engine is not None and manager_status == 'active': if failover_without_quorum_did_not_happen(raw_topo)["one_did_not_happen"] or failover_with_quorum_did_not_happen(raw_topo)["one_did_not_happen"]: if not failover_without_quorum_did_not_happen(raw_topo)["one_cleared"] and not failover_with_quorum_did_not_happen(raw_topo)["one_cleared"]: xprint("Cluster did not comply with our previous failover request. Waiting.") else: # Failover all master nodes on this engine for node in raw_topo: if node_role(node) == 'master' and node_status(node) == 'ok' and node_ip(node) == slave_only_engine: slave = get_one_slave(raw_topo, node) if slave is None: xprint("Master " + node_name(node) + " has no slave !") else: failover_with_quorum(node_ip(slave), node_port(slave)) failover_with_quorum_requested.append({'slave': node_object_from_node_name(node_name(slave)), 'epoch': time.mktime(time.gmtime())}) # Engine has already only slaves, starting the clock. if not has_master(slave_only_engine, raw_topo) and no_repartition_until == 0: no_repartition_until = time.mktime(time.gmtime()) + no_repartition_duration if no_repartition_until != 0: delta = no_repartition_until - time.mktime(time.gmtime()) if delta <= 0: # We reached the requested duration, resetting. no_repartition_until = 0 slave_only_engine = None no_repartition_duration = 0 else: xprint("Skipping master imbalance correction as requested " + str(delta) + " seconds remaining.") time.sleep(loop_time) continue if slave_only_engine is not None: xprint("Still trying to remove slaves from " + slave_only_engine) time.sleep(loop_time) continue # Loop over nodes, detect imbalanced master repartition and promote slaves accordingly imbalanced = False if not all_checks: pass elif len(startup_nodes_by_server) < 2: xprint("Only one server: skipping master imbalance correction.") else: server_master_repartition_dict = server_master_repartition(server_list(startup_nodes_by_server), raw_topo) datacenter_master_repartition_dict = datacenter_master_repartition(datacenter_count(startup_nodes_by_datacenter), raw_topo) # Detect too many masters on a server. name, master_count, master_total_count = detect_imbalance(server_master_repartition_dict) if name is not None: cluster_state = 'Imbalanced' imbalanced = True xprint(server_master_repartition_dict) #pprint.pprint(raw_topo, width=300) xprint("Too many masters on server " + str(name) + ": " + str(master_count) + "/" + str(master_total_count)) if manager_status == 'active': master, slave = find_failover_candidate(raw_topo, server_master_repartition_dict, datacenter_master_repartition_dict, startup_nodes_by_server, startup_nodes_by_datacenter) if master is None or slave is None: xprint("Could not find a failover solution.") else: if failover_without_quorum_did_not_happen(raw_topo)["one_did_not_happen"] or failover_with_quorum_did_not_happen(raw_topo)["one_did_not_happen"]: if not failover_without_quorum_did_not_happen(raw_topo)["one_cleared"] and not failover_with_quorum_did_not_happen(raw_topo)["one_cleared"]: xprint("Cluster did not comply with our previous failover request. Waiting.") else: xprint("Slave " + slave + " will replace his master " + master) node_object = node_object_from_node_name(slave) failover_with_quorum(node_object['host'], node_object['port']) failover_with_quorum_requested.append({'slave': node_object, 'epoch': time.mktime(time.gmtime())}) time.sleep(loop_time) continue if len(startup_nodes_by_datacenter) < 2: xprint("Only one datacenter: skipping master imbalance correction by datacenter.") else: # Detect too many masters on a datacenter. # It is possible to have no imbalance by server but an imbalance by datacenter (+1 master on each server of a datacenter compared to the other and at least 2 servers by datacenter). name, master_count, master_total_count = detect_imbalance(datacenter_master_repartition_dict) if name is not None: cluster_state = 'Imbalanced' imbalanced = True xprint("Too many masters on datacenter " + str(name) + ": " + str(master_count) + "/" + str(master_total_count)) if manager_status == 'active': master, slave = find_failover_candidate(raw_topo, server_master_repartition_dict, datacenter_master_repartition_dict, startup_nodes_by_server, startup_nodes_by_datacenter) if master is None or slave is None: xprint("Could not find a failover solution.") else: if failover_without_quorum_did_not_happen(raw_topo)["one_did_not_happen"] or failover_with_quorum_did_not_happen(raw_topo)["one_did_not_happen"]: if not failover_without_quorum_did_not_happen(raw_topo)["one_cleared"] and not failover_with_quorum_did_not_happen(raw_topo)["one_cleared"]: xprint("Cluster did not comply with our previous failover request. Waiting.") else: xprint("Slave " + slave + " will replace his master " + master) node_object = node_object_from_node_name(slave) failover_with_quorum(node_object['host'], node_object['port']) failover_with_quorum_requested.append({'slave': node_object, 'epoch': time.mktime(time.gmtime())}) time.sleep(loop_time) continue if not imbalanced: cluster_state = 'OK' time.sleep(loop_time) def failover_without_quorum_did_not_happen(raw_topo): global failover_without_quorum_requested one_did_not_happen = False one_cleared = False for slave_dict in failover_without_quorum_requested: found_node_role = None for node in raw_topo: if node_name_from_node_object(slave_dict['slave']) == node_name(node): found_node_role = node_role(node) break if found_node_role == 'master': failover_without_quorum_requested.remove(slave_dict) elif time.mktime(time.gmtime()) - slave_dict['epoch'] > failover_max_wait: xprint("Cluster has not performed failover for slave " + node_name_from_node_object(slave_dict['slave']) + " requested " + str(failover_max_wait) + " seconds ago. Removing the failover request.") failover_without_quorum_requested.remove(slave_dict) one_cleared = True else: one_did_not_happen = True return {"one_did_not_happen": one_did_not_happen, "one_cleared": one_cleared} def failover_with_quorum_did_not_happen(raw_topo): global failover_with_quorum_requested one_did_not_happen = False one_cleared = False for slave_dict in failover_with_quorum_requested: found_node_role = None for node in raw_topo: if node_name_from_node_object(slave_dict['slave']) == node_name(node): found_node_role = node_role(node) break if found_node_role == 'master': failover_with_quorum_requested.remove(slave_dict) elif time.mktime(time.gmtime()) - slave_dict['epoch'] > failover_max_wait: xprint("Cluster has not performed failover for slave " + node_name_from_node_object(slave_dict['slave']) + " requested " + str(failover_max_wait) + " seconds ago. Removing the failover request.") failover_with_quorum_requested.remove(slave_dict) one_cleared = True else: one_did_not_happen = True return {"one_did_not_happen": one_did_not_happen, "one_cleared": one_cleared} def has_quorum(raw_topo): masters_ok_count = masters_ok_count_for_cluster(raw_topo) all_masters_count = masters_count_for_cluster(raw_topo) if masters_ok_count < all_masters_count / 2 + 1: return False return True def detect_imbalance(master_repartition_dict): master_total_count = sum(master_repartition_dict.values()) set_count = len(master_repartition_dict) for set_name, master_count in master_repartition_dict.iteritems(): # sets can be datacenters or servers # If we have only 2 sets and an even number of masters, we at least try not to have more than half the masters on one set # If we have only 2 sets and an odd number of masters, we at least try not to have more than half the masters + 1 on one set if set_count == 2: if master_total_count % 2 == 0: if master_count > master_total_count / 2: return set_name, master_count, master_total_count elif master_total_count % 2 != 0: if master_count > master_total_count / 2 + 1: return set_name, master_count, master_total_count # If we have 3 sets and 4 masters, we will have 2 masters on one set elif set_count == 3: if master_count > master_total_count / 2: return set_name, master_count, master_total_count else: if master_total_count % 2 == 0: if master_count > master_total_count / 2 - 1: return set_name, master_count, master_total_count elif master_total_count % 2 != 0: if master_count > master_total_count / 2: return set_name, master_count, master_total_count return None, None, None # Find the solution which minimizes the number of steps. The constraints are server_master_repartition and datacenter_master_repartition. def find_failover_candidate(raw_topo, server_master_repartition_dict, datacenter_master_repartition_dict, startup_nodes_by_server, startup_nodes_by_datacenter): global plan plan = dict() max_steps = 3 solution_steps_chain = {'0': raw_topo} master_slave_steps_chain = dict() raw_topo_permut_dict = copy.deepcopy(solution_steps_chain) for i in range(0, max_steps): if debug: xprint(i) j = 0 raw_topo_1_permutations = dict() for position, raw_topo_permut in raw_topo_permut_dict.iteritems(): if debug: xprint("start position: ") pprint.pprint(raw_topo_permut, width=300) server_master_repartition_dict = server_master_repartition(server_list(startup_nodes_by_server), raw_topo_permut) xprint(server_master_repartition_dict) datacenter_master_repartition_dict = datacenter_master_repartition(datacenter_count(startup_nodes_by_datacenter), raw_topo_permut) xprint(datacenter_master_repartition_dict) # This only returns masters and slaves with node_status(master) == 'ok' or 'cluster still assessing' and node_status(slave) == 'ok' or 'cluster still assessing': master_slaves_dict = master_slaves_topo(raw_topo_permut) # generate all 1-permutation sets for master in master_slaves_dict: if debug: xprint("master: " + str(master)) for slave in master_slaves_dict[master]: raw_topo_copy = copy.deepcopy(raw_topo_permut) raw_topo_1_permutation = simul_failover(master, slave, raw_topo_copy) if debug: xprint("slave: " + str(slave)) server_master_repartition_dict = server_master_repartition(server_list(startup_nodes_by_server), raw_topo_1_permutation) xprint(server_master_repartition_dict) datacenter_master_repartition_dict = datacenter_master_repartition(datacenter_count(startup_nodes_by_datacenter), raw_topo_1_permutation) xprint(datacenter_master_repartition_dict) pprint.pprint(raw_topo_1_permutation, width=300) j += 1 if not raw_topo_1_permutation in solution_steps_chain.values(): #print "not already stored" if solver_check(raw_topo_1_permutation, startup_nodes_by_server, startup_nodes_by_datacenter): xprint("Found a solution: ") pprint.pprint(raw_topo_1_permutation, width=300) # return the first step if i == 0: xprint("Sounds like a plan !") xprint("only one step : " + str([master, slave])) plan['0'] = {'starting_topo': copy.deepcopy(raw_topo)} plan['1'] = {'master': master, 'slave': slave, 'target_topo': raw_topo_1_permutation} return master, slave else: #print("first step position: " + position) first = position.split('.')[1] #print("first step: ") #pprint.pprint(solution_steps_chain['0.'+first], width=300) #print("master: "+master_slave_steps_chain['0.'+first][0]) #print("slave: "+master_slave_steps_chain['0.'+first][1]) step_key = '0' step_number = 1 xprint("Sounds like a plan !") end_position = position+'.'+str(j) solution_steps_chain[end_position] = raw_topo_1_permutation master_slave_steps_chain[end_position] = [master, slave] plan['0'] = {'starting_topo': copy.deepcopy(raw_topo)} for step in end_position.split('.')[1:]: step_key += '.'+step xprint("step "+str(step_number) + ": " + str(master_slave_steps_chain[step_key])) plan[str(step_number)] = {'master': master_slave_steps_chain[step_key][0], 'slave': master_slave_steps_chain[step_key][1], 'target_topo': solution_steps_chain[step_key]} step_number += 1 return master_slave_steps_chain['0.'+first][0], master_slave_steps_chain['0.'+first][1] else: if debug: xprint("============== store permutation =============") solution_steps_chain[position+'.'+str(j)] = raw_topo_1_permutation master_slave_steps_chain[position+'.'+str(j)] = [master, slave] raw_topo_1_permutations[position+'.'+str(j)] = raw_topo_1_permutation raw_topo_permut_dict = copy.deepcopy(raw_topo_1_permutations) return None, None def solver_check(raw_topo_1_permutation, startup_nodes_by_server, startup_nodes_by_datacenter): server_master_repartition_dict = server_master_repartition(server_list(startup_nodes_by_server), raw_topo_1_permutation) datacenter_master_repartition_dict = datacenter_master_repartition(datacenter_count(startup_nodes_by_datacenter), raw_topo_1_permutation) if debug: xprint("solver_check") pprint.pprint(raw_topo_1_permutation, width=300) xprint(server_master_repartition_dict) xprint(datacenter_master_repartition_dict) name_server, master_count_server, master_total_count = detect_imbalance(server_master_repartition_dict) name_datacenter, master_count_datacenter, master_total_count = detect_imbalance(datacenter_master_repartition_dict) if name_server is None and name_datacenter is None: return True return False def simul_failover(master, slave, raw_topo): raw_topo_copy = copy.deepcopy(raw_topo) for node in raw_topo: if node_name(node) == master: switch_role(node) elif node_name(node) == slave: switch_role(node) if raw_topo_copy == raw_topo: xprint("Failed") #print("raw_topo_copy: " + str(raw_topo_copy)) #print("raw_topo: " + str(raw_topo)) return raw_topo def master_slaves_topo(raw_topo): master_slaves_dict = dict() for master in raw_topo: if node_role(master) == 'master' and node_status(master) in ['ok', 'cluster still assessing']: master_slaves_dict[node_name(master)] = list() for slave in raw_topo: if node_id(master) == masterid_of_slave(slave): if node_role(slave) == 'slave' and node_status(slave) in ['ok', 'cluster still assessing']: master_slaves_dict[node_name(master)].append(node_name(slave)) return master_slaves_dict def has_master(engine, raw_topo): for node in raw_topo: if node_role(node) == 'master' and node_ip(node) == engine: return True return False def cluster_online(startup_nodes_list): for startup_node in startup_nodes_list: proc = Popen(["timeout", "1", redis_cli, "-h", startup_node['host'], "-p", startup_node['port'], "--raw", "CLUSTER", "INFO"], stdout=PIPE) result = proc.communicate()[0].split() if isinstance(result, list) and len(result) > 0 and result[0] == 'cluster_state:ok': return True return False def cluster_topo(startup_nodes_list, startup_nodes_by_datacenter): for startup_node in startup_nodes_list: proc = Popen(["timeout", "1", redis_cli, "-h", startup_node['host'], "-p", startup_node['port'], "--raw", "CLUSTER", "NODES"], stdout=PIPE) result_str = proc.communicate()[0] if not isinstance(result_str, str) or result_str == '': continue result = result_str.strip('\n').split('\n') result = [string.split(" ") for string in result] result_bak = copy.deepcopy(result) for node in result: if len(node) < 9: node.append('-') if isinstance(result, list) and len(result) > 0: result = append_datacenter(result, startup_nodes_by_datacenter) i = 0 tmp = '' for line in result_str.strip('\n').split('\n'): tmp += line if len(result_bak[i]) < 9: tmp += " -" tmp += " " + str(node_datacenter(result[i])) + '\n' i += 1 result_str = tmp return result, result_str return None, None def append_datacenter(raw_topo, startup_nodes_by_datacenter): for node in raw_topo: datacenter_index = get_datacenter_for_node(node, startup_nodes_by_datacenter) node.append(datacenter_index) return raw_topo def same_cluster(startup_nodes_list, raw_topo): if len(startup_nodes_list) != len(raw_topo): xprint('Found a different number of nodes.') return False for node in raw_topo: if node_name(node) not in [node['host'] + ':' + node['port'] for node in startup_nodes_list]: xprint(node_name(node) + ' found but unknown.') return False return True def cluster_has_changed(current_cluster_topo, raw_topo): if len(current_cluster_topo) != len(raw_topo): xprint('Found a different number of nodes.') return True for node in raw_topo: found = False for node2 in current_cluster_topo: if node_name(node) == node_name(node2): found = True if node_role(node) != node_role(node2): return True break if not found: return True return False def datacenter_master_repartition(int_datacenter_count, raw_topo): datacenter_master_repartition_dict = dict() for i in range(0, int_datacenter_count): datacenter_master_repartition_dict[str(i)] = master_count_for_datacenter(i, raw_topo) return datacenter_master_repartition_dict def server_master_repartition(servers, raw_topo): server_master_repartition_dict = dict() for server in servers: server_master_repartition_dict[server] = master_count_for_server(server, raw_topo) return server_master_repartition_dict def datacenter_count(startup_nodes_by_datacenter): return len(startup_nodes_by_datacenter) def server_count(startup_nodes_by_server): return len(startup_nodes_by_server) def server_list(startup_nodes_by_server): return startup_nodes_by_server.keys() def master_count_for_datacenter(datacenter_index, raw_topo): count = 0 for node in raw_topo: if node_role(node) == 'master' and node_status(node) in ['ok', 'cluster still assessing'] and node_datacenter(node) == datacenter_index: count += 1 return count def master_count_for_server(server, raw_topo): count = 0 for node in raw_topo: if node_role(node) == 'master' and node_status(node) in ['ok', 'cluster still assessing'] and node_ip(node) == server: count += 1 return count def masters_ok_count_for_cluster(raw_topo): count = 0 for node in raw_topo: if node_role(node) == 'master' and node_status(node) == 'ok': count += 1 return count def masters_count_for_cluster(raw_topo): count = 0 seen = list() for node in raw_topo: if node_role(node) == 'master' and node_name(node) not in seen: count += 1 seen.append(node_name(node)) return count def node_datacenter(node): return node[9] def node_role(node): if 'slave' in node[2]: return 'slave' return 'master' def switch_role(node): if 'slave' in node[2]: node[2] = node[2].replace('slave', 'master') else: node[2] = node[2].replace('master', 'slave') def master_status_of_slave(raw_topo, slave): return node_status_from_id(raw_topo, masterid_of_slave(slave)) def masterid_of_slave(slave): return slave[3] def node_id(node): return node[0] def node_name(node): return node[1] def node_status(node): if node[2] in ['myself,slave', 'myself,master', 'slave', 'master'] and node[7] == 'connected': return 'ok' elif node[2] in ['slave,fail?', 'master,fail?']: return 'cluster still assessing' return 'failed' def node_status_from_id(raw_topo, nodeid): for node in raw_topo: if nodeid == node_id(node): return node_status(node) def node_name_from_id(raw_topo, nodeid): for node in raw_topo: if nodeid == node_id(node): return node_name(node) def node_ip_from_id(raw_topo, nodeid): for node in raw_topo: if nodeid == node_id(node): return node_ip(node) def node_datacenter_from_id(raw_topo, nodeid): for node in raw_topo: if nodeid == node_id(node): return node_datacenter(node) def node_object_from_node_name(node_name): ip = node_name.split(':')[0] port = node_name.split(':')[1] return {"host": ip, "port": port} def node_name_from_node_object(node_object): return node_object["host"]+":"+ node_object["port"] def cluster_startup_topo(): startup_nodes = args.REDIS_NODES # Clean the string startup_nodes = "".join(startup_nodes.split()) startup_nodes = startup_nodes.split('/') startup_nodes_list = list() # TODO: startup_nodes_by_datacenter should be a dict startup_nodes_by_datacenter = list() startup_nodes_by_server = dict() for datacenter in startup_nodes: tmp = datacenter.split(',') for node in tmp: node_dict = node_object_from_node_name(node) ip = node_dict['host'] port = node_dict['port'] startup_nodes_list.append(node_dict) if ip in startup_nodes_by_server.keys(): startup_nodes_by_server[ip].append(port) else: startup_nodes_by_server[ip] = [port] startup_nodes_by_datacenter.append(tmp) #print(startup_nodes_by_server) #print(startup_nodes_by_datacenter) #print(startup_nodes_list) return startup_nodes_list, startup_nodes_by_datacenter, startup_nodes_by_server def get_datacenter_for_node(node, startup_nodes_by_datacenter): i = 0 for datacenter in startup_nodes_by_datacenter: if node_name(node) in datacenter: return i i += 1 return None # ip, port of the slave that will replace his master def failover_without_quorum(ip, port): xprint(redis_cli + " -h " + ip + " -p " + port + " --raw CLUSTER FAILOVER TAKEOVER") if not test: proc = Popen(["timeout", "1", redis_cli, "-h", ip, "-p", port, "--raw", "CLUSTER", "FAILOVER", "TAKEOVER"], stdout=PIPE) result = proc.communicate()[0].split() # ip, port of the slave that will replace his master def failover_with_quorum(ip, port): xprint(redis_cli + " -h " + ip + " -p " + port + " --raw CLUSTER FAILOVER") if not test: proc = Popen(["timeout", "1", redis_cli, "-h", ip, "-p", port, "--raw", "CLUSTER", "FAILOVER"], stdout=PIPE) result = proc.communicate()[0].split() def node_ip(node): return node_object_from_node_name(node_name(node))['host'] def node_port(node): return node_object_from_node_name(node_name(node))['port'] def master_has_at_least_one_slave(raw_topo, master): for node in raw_topo: if node_id(master) == masterid_of_slave(node): return True return False def get_one_slave(raw_topo, master): for node in raw_topo: if node_id(master) == masterid_of_slave(node): return node return None class MyHandler(BaseHTTPRequestHandler): def do_GET(self): global sleep_until global slave_only_engine global no_repartition_duration global cluster_state global request_active global debug if self.path == '/debug/enable': self.send_response(200) debug = True elif self.path == '/debug/disable': self.send_response(200) debug = False elif self.path == '/version': self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(version+'\n') elif self.path == '/help': self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(api_help()) elif self.path == '/cluster_status': self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(cluster_state+'\n') elif self.path == '/manager_status': self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() delta = time.mktime(time.gmtime()) - last_loop_epoch if delta > unresponsive_timeout: xprint("manager main loop is unresponsive!") answer = "failed" cluster_state = 'Unknown state' request_active = False else: answer = manager_status if sleep_until != 0: answer += " asleep" if no_repartition_until != 0: answer += " repartition disabled" self.wfile.write(answer) elif self.path == '/request_active': self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() if request_active: self.wfile.write("yes") else: self.wfile.write("no") elif "/sleep" in self.path: try: sleep_duration = int(self.path.split("=")[1]) sleep_until = time.mktime(time.gmtime()) + sleep_duration self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("OK") cluster_state = 'manager disabled' except: self.send_response(400) elif "/prepare_for_reboot" in self.path: bad_request = False try: no_repartition_duration_req = int(self.path.split("duration=")[1]) slave_only_engine_req = self.path.split("/")[2].split("&")[0] if not slave_only_engine_req in [node_ip(node) for node in raw_topo]: self.send_response(400) bad_request = True except: self.send_response(400) bad_request = True if not bad_request: if manager_status == 'passive': self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("CANNOT PROCEED: passive manager") elif manager_status == 'starting': self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("CANNOT PROCEED: manager is starting") elif time.mktime(time.gmtime()) - last_loop_epoch > 10: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("CANNOT PROCEED: failed manager") elif no_repartition_duration != 0 and slave_only_engine != slave_only_engine_req: xprint("A request has already been made to only have slaves on engine " + slave_only_engine + ".") self.send_response(403) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("SIMILAR REQUEST ALREADY IN PROGRESS") elif no_repartition_duration != 0 and slave_only_engine == slave_only_engine_req and not has_master(slave_only_engine, raw_topo): self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("DONE") else: slave_only_engine = slave_only_engine_req no_repartition_duration = no_repartition_duration_req self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("WAIT") else: self.send_response(404) def log_message(self, format, *args): if debug: xprint("%s - - [%s] %s" % (self.address_string(), self.log_date_time_string(), format%args)) class WebThread(threading.Thread): def run(self): httpd.serve_forever() if __name__ == "__main__": global cluster_state cluster_state = 'manager disabled' global manager_status manager_status = 'starting' global request_active request_active = False global sleep_until sleep_until = 0 global no_repartition_until no_repartition_until = 0 global no_repartition_duration no_repartition_duration = 0 global slave_only_engine slave_only_engine = None global raw_topo raw_topo = None global last_loop_epoch global debug debug = False httpd = SocketServer.TCPServer(("", http_port), MyHandler, bind_and_activate=False) httpd.allow_reuse_address = True httpd.server_bind() httpd.server_activate() webserver_thread = WebThread() webserver_thread.start() xprint(api_help()) try: main() except KeyboardInterrupt: xprint('Interrupted') httpd.shutdown() httpd.server_close() sys.exit(0) except: xprint('We crashed!') xprint(traceback.format_exc()) httpd.shutdown() httpd.server_close() sys.exit(0) # vim: set ts=4 sw=4 sts=4 et :