import glob import json import sys import threading import time import requests from utils import Problem, GroupPart import utils.adcclilib as adccli # import adc2019.client.adcclient class Host(object): def __init__(self, config): self.type = 'host' self.config = config self.problems = dict() self.worker_manager = None self.request = dict() self.problem_to_request = dict() self._load_problems(config['problem_path']) self._setup_workers() def __repr__(self): return "Host" def _load_problems(self, path): problems_path = glob.glob(path) problems_path = sorted(problems_path) for v in problems_path: problem = Problem(v, self.config['solution_path']) _key = problem.name self.problems[_key] = problem self.problem_to_request[_key] = list() def _setup_workers(self): self.worker_manager = WorkerManager(self.config['address']) for v in self.config['worker']: self.worker_manager.add_worker(v) def get_workers(self): return self.worker_manager.get_workers() def distribute_problem(self, problem_key, solvers=None): problem = self.get_problem(problem_key) if problem is None: return {'status': 'key error'} else: # request = Request(self.worker_manager, problem.get_dict(), solvers=solvers) if solvers is None: _solvers = self.worker_manager.get_workers() else: _solvers = dict() for k, v in self.worker_manager.get_workers().items(): if k in solvers: _solvers[k] = v for _, v in _solvers.items(): request = Request(self.worker_manager, problem.get_dict(), v) request.broadcast() request_id = request.get_id() self.request[request_id] = request self.problem_to_request[problem_key].append(request_id) return { 'status': 'processed', } def get_problems(self): return self.problems def get_problem(self, _key): if _key in self.problems: return self.problems[_key] else: return None def store_solution(self, solution): if 'part_id' in solution: if solution['status'] != 'done': return {'status': 'ignored'} problem_key = solution['problem'] request_id = solution['request_id'] if request_id in self.request: self.request[request_id].store_response(solution) if problem_key in self.problems: res = self.problems[problem_key].put_partial_solution(solution) else: res = False print(solution['solution']) print(solution['line_map'], solution['block_map']) if res: merge_problem = self.problems[problem_key].partial_merge_problem # print(merge_problem) merge_problem['request_id'] = solution['request_id'] merge_problem['timeout'] = 10000 for k, v in self.worker_manager.workers.items(): if v.role == 'merge_solver': v.post('solve', merge_problem) return {'status': 'done'} else: request_id = solution['request_id'] if request_id in self.request: self.request[request_id].store_response(solution) else: print(f'W: Unknown request_id: {request_id}') problem_key = solution['problem'] if problem_key in self.problems: self.problems[problem_key].put_solution(solution) print(solution['solution']) return {'status': 'registered'} else: return {'status': 'error'} def save_solution(self, problem_key): problem = self.get_problem(problem_key) if problem is not None: problem.save_best_solution() return {'status': 'saved'} else: return {'status': 'failed'} def get_request_status(self, request_id): if request_id in self.request: return self.request[request_id].get_status() else: return {'status': 'unknown request'} def get_request_by_problem(self, problem_key): if problem_key in self.problem_to_request: request_statuses = list() for rid in self.problem_to_request[problem_key]: r = self.request[rid] request_statuses.append(r.get_status()) return reversed(request_statuses) else: return None def get_viewer_data(self, problem_key, solution_id): problem = self.get_problem(problem_key) if problem is None: return None problem_data = problem.get_d3json() if solution_id is None: solution_data = None else: solution = problem.get_solution(solution_id) solution_data = solution.get_d3json() return { 'problem': problem_data, 'solution': solution_data } def call_api(self, method, cmd, params): if cmd == 'role': # サーバの役割確認 return {'role': self.type} elif cmd == 'problem/solve': # params['problem']に指定された問題をworkerに配信 problem_key = params['problem'] if 'solvers' in params: solvers = params['solvers'] else: solvers = None return self.distribute_problem(problem_key, solvers) elif cmd == 'problem/solution': self.store_solution(params) return {'status': 'received'} elif cmd == 'problem/save': problem_key = params['problem'] return self.save_solution(problem_key) elif cmd == 'request/status': request_id = float(params['request_id']) return self.get_request_status(request_id) elif cmd == 'stop': self.worker_manager.request_stop() return {} elif cmd == 'cancel': self.worker_manager.request_cancel(params) return {'status': 'canceled'} elif cmd == 'worker/reset': self.worker_manager.reset_worker(params['address']) return {'status': 'reset'} elif cmd == 'worker/stop': self.worker_manager.request_stop(params['address']) return {'status': 'stopped'} elif cmd == 'worker/status': self.worker_manager.update_status(params['address'], params['status']) return {'status': 'updated'} elif cmd == 'view': problem_key = params['problem'] solution_id = params['solution'] return self.get_viewer_data(problem_key, solution_id) elif cmd == 'adccli/login': r = adccli.login(self.config['adccli']['url'], self.config['adccli']['username'], self.config['adccli']['password']) return {'status': r} elif cmd == 'adccli/logout': r = adccli.logout() return {'status': r} elif cmd == 'adccli/whoami': r = adccli.whoami() return {'message': r, 'status': r == self.config['adccli']} elif cmd == 'adccli/download-problem': out_abspath = os.path.abspath(self.config['problem_path']) r = adccli.get_q_all(out_abspath) self._load_problems(self.config['problem_path']) return {'status': r} elif cmd == 'adccli/upload-solution': problem_key = params['problem'] problem = self.problems[problem_key] if problem.best_solution_key is None: res = {'status': "Required to 'save' before submission"} else: solution_path = f"{self.config['solution_path']}/{problem_key}/submit/{problem_key}.txt" qnumber = problem_key.replace("NL_Q", "").replace(".txt", "") solution_abspath = os.path.abspath(solution_path) r = adccli.put_a(qnumber, solution_abspath) mes = r + "\n" best_solution_key = problem.best_solution_key best_solution = problem.get_solution(best_solution_key) cputime = f'{best_solution.elapsed_time:.3f}' info_mes = f'Solver: {best_solution.worker}, RID: {best_solution.request_id}' r = adccli.put_a_info(qnumber, cputime, "0", info_mes) mes += r res = {'status': mes} return res else: return None class WorkerManager(object): def __init__(self, host_address): self.workers = dict() self.host = host_address # self.heartbeat_thread = threading.Thread(name='heartbeat', target=self.heartbeat, daemon=True) # self.heartbeat_thread.start() # def heartbeat(self): # while True: # for k, v in self.workers.items(): # v.update_status() # time.sleep(1) def add_worker(self, conf): worker_conf = dict() worker_conf.update(conf) worker_conf['host'] = self.host worker_address = worker_conf['address'] self.workers[worker_address] = Worker(worker_conf) def get_workers(self): return self.workers def update_status(self, address, status): self.workers[address].status = status def reset_worker(self, address): self.workers[address].configure() def request_stop(self, address=None): if address is None: self.broadcast('stop', {}) else: self.workers[address].post('stop', {}) def request_cancel(self, params): self.broadcast('cancel', params) def broadcast(self, cmd, params, solvers=None): threads = [] def _sender(_worker, _cmd, _params): _worker.post(_cmd, _params) for k, v in self.workers.items(): if (not solvers is None) and (not k in solvers): continue _th = threading.Thread(name=v.address, target=_sender, args=(v, cmd, params), daemon=True) _th.start() threads.append(_th) class Worker(object): def __init__(self, params): self.address = params['address'] self.name = params['name'] self.host = params['host'] self.role = params['role'] if not 'partial_mode' in params: self.partial_mode = False else: self.partial_mode = params['partial_mode'] self.params = params self.status = 'Ready' self.configure() def update_status(self): res = self.post('status', None) if res is None: self.status = 'Not connected' else: self.status = res.json()['status'] def post(self, path, data): try: response = requests.post( f'http://{self.address}/api/{path}', json.dumps(data), headers={'Content-Type': 'application/json'}, timeout=2) print(f"I: Post to {self.address}, API Cmd: {path}") except Exception as e: print(f"W: Failed to connect {self.address}") self.status = 'Not connected' response = None return response def configure(self): r = self.post('role', self.params) class Request(object): def __init__(self, worker_manager, data, worker, timeout=10000): # self.worker_manager = worker_manager self.data = data self.timeout = timeout self.request_id = time.time() self.request_time = None self.done_time = None # if solvers is None: # self.solvers = list(self.worker_manager.get_workers().keys()) # else: # self.solvers = solvers self.solver = worker self.response = dict() # for w in self.solvers: # self.response[w] = list() self.response = list() if self.solver.partial_mode: self.part_nums = len(data['group_problems']) self.solved = [0 for v in range(self.part_nums + 1)] else: self.part_nums = 1 self.solved = [0] @property def request_data(self): data = self.data data['request_id'] = self.request_id data['timeout'] = self.timeout return data def get_id(self): return self.request_id def get_dict(self): return self.request_data def store_response(self, data): # worker = data['worker'] # self.response[worker].append(data) # self.response[worker] = data self.response.append(data) print(data) if 'part_id' in data: idx = data['part_id'] print(idx) if data['status'] == 'done': self.solved[idx] = 1 else: self.solved[idx] = -1 if all([v!=0 for v in self.solved]) and any([v<0 for v in self.solved]): self.solved[-1] = -1 else: if data['status'] == 'done': self.solved[-1] = 1 else: self.solved[-1] = -1 self.done_time = time.time() def get_status(self): status = '' if self.solved[-1] == 1: elapsed_time = self.done_time - self.request_time et_minutes = int(elapsed_time // 60) et_seconds = int(elapsed_time % 60) status = f'done ({self.solved[-1]}) [{et_minutes}:{et_seconds:02}]' elif self.solved[-1] == -1: status = 'failed' else: if self.solver.status == 'Not connected': status = 'Not connected' else: if len(self.solved) == 1: status_mes = 'Running' elif len(self.solved) > 1: counter = sum([v>0 for v in self.solved]) status_mes = f'{counter}/{len(self.solved)}' elapsed_time = time.time() - self.request_time et_minutes = int(elapsed_time // 60) et_seconds = int(elapsed_time % 60) elapsed_time_str = f'{et_minutes}:{et_seconds:02}' status = f'{status_mes} [{elapsed_time_str}]' return { 'status': status, 'worker': self.solver.address, # 'solutions': response_count, # 'progress': progress, # 'worker_status': worker_status } def broadcast(self): def _sender(): self.solver.post('solve', self.request_data) _th = threading.Thread(name=f'request_sender_{self.solver.address}', target=_sender, daemon=True) _th.start() self.request_time = time.time()