import glob import json import os import sys import threading import time import requests from utils import Problem, GroupPart import utils.adcclilib as adccli # import adc2019.client.adcclient class Host(object): def __init__(self, config): self.type = 'host' self.config = config self.problems = dict() self.worker_manager = None self.request = dict() self.problem_to_request = dict() self._load_problems(config['problem_path']) self._setup_workers() def __repr__(self): return "Host" def _load_problems(self, path): problems_path = glob.glob(path) problems_path = sorted(problems_path) for v in problems_path: problem = Problem(v, self.config['solution_path']) _key = problem.name self.problems[_key] = problem self.problem_to_request[_key] = list() def _setup_workers(self): self.worker_manager = WorkerManager(self.config['address']) for v in self.config['worker']: self.worker_manager.add_worker(v) def get_workers(self): return self.worker_manager.get_workers() def distribute_problem(self, problem_key, solvers=None): problem = self.get_problem(problem_key) if problem is None: return {'status': 'key error'} else: # request = Request(self.worker_manager, problem.get_dict(), solvers=solvers) merge_solvers = list() # if solvers is None: # _solvers = self.worker_manager.get_workers() # else: _solvers = dict() for k, v in self.worker_manager.get_workers().items(): if (solvers is None) or (k in solvers): if v.role == 'merge_solver': merge_solvers.append(k) else: _solvers[k] = v for _, v in _solvers.items(): request = Request(self.worker_manager, problem.get_dict(), v, merge_solvers=merge_solvers) request.broadcast() request_id = request.get_id() self.request[request_id] = request self.problem_to_request[problem_key].append(request_id) return { 'status': 'processed', } def get_problems(self): return self.problems def get_problem(self, _key): if _key in self.problems: return self.problems[_key] else: return None def store_solution(self, solution): if 'part_id' in solution: request_id = solution['request_id'] if solution['status'] != 'done': if request_id in self.request: self.request[request_id].store_failed(solution) return {'status': 'ignored'} problem_key = solution['problem'] request_id = solution['request_id'] if request_id in self.request: self.request[request_id].store_response(solution) if problem_key in self.problems: parts_valid = self.problems[problem_key].put_partial_solution(solution) else: parts_valid = False print(solution['solution']) print(solution['line_map'], solution['block_map']) if parts_valid and self.request[request_id].is_receive_completed: merge_problem = self.problems[problem_key].partial_merge_problem # print(merge_problem) merge_problem['request_id'] = solution['request_id'] merge_problem['timeout'] = 10000 for k, v in self.worker_manager.workers.items(): if ('merge_solvers' in solution) and (len(solution['merge_solvers']) > 0): if k in solution['merge_solvers']: v.post('solve', merge_problem) elif v.role == 'merge_solver': v.post('solve', merge_problem) return {'status': 'done'} else: request_id = solution['request_id'] if request_id in self.request: self.request[request_id].store_response(solution) else: print(f'W: Unknown request_id: {request_id}') problem_key = solution['problem'] if problem_key in self.problems: self.problems[problem_key].put_solution(solution) print(solution['solution']) return {'status': 'registered'} else: return {'status': 'error'} def save_solution(self, problem_key): problem = self.get_problem(problem_key) if problem is not None: problem.save_best_solution() return {'status': 'saved'} else: return {'status': 'failed'} def get_request_status(self, request_id): if request_id in self.request: return self.request[request_id].get_status() else: return {'status': 'unknown request'} def get_request_by_problem(self, problem_key): if problem_key in self.problem_to_request: request_statuses = list() for rid in self.problem_to_request[problem_key]: r = self.request[rid] request_statuses.append(r.get_status()) return reversed(request_statuses) else: return None def get_viewer_data(self, problem_key, solution_id): problem = self.get_problem(problem_key) if problem is None: return None problem_data = problem.get_d3json() if solution_id is None: solution_data = None else: solution = problem.get_solution(solution_id) solution_data = solution.get_d3json() return { 'problem': problem_data, 'solution': solution_data } def call_api(self, method, cmd, params): if cmd == 'role': # サーバの役割確認 return {'role': self.type} elif cmd == 'problem/solve': # params['problem']に指定された問題をworkerに配信 problem_key = params['problem'] if 'solvers' in params: solvers = params['solvers'] else: solvers = None return self.distribute_problem(problem_key, solvers) elif cmd == 'problem/solution': self.store_solution(params) return {'status': 'received'} elif cmd == 'problem/save': problem_key = params['problem'] return self.save_solution(problem_key) elif cmd == 'request/status': request_id = float(params['request_id']) return self.get_request_status(request_id) elif cmd == 'stop': self.worker_manager.request_stop() return {} elif cmd == 'cancel': self.worker_manager.request_cancel(params) _requests = self.problem_to_request[params['problem']] for r in _requests: self.request[r].cancel() return {'status': 'canceled'} elif cmd == 'worker/reset': self.worker_manager.reset_worker(params['address']) return {'status': 'reset'} elif cmd == 'worker/stop': self.worker_manager.request_stop(params['address']) return {'status': 'stopped'} elif cmd == 'worker/status': self.worker_manager.update_status(params['address'], params['status']) if 'request_id' in params['params']: request_id = params['params']['request_id'] self.request[request_id].set_running() return {'status': 'updated'} elif cmd == 'view': problem_key = params['problem'] solution_id = params['solution'] return self.get_viewer_data(problem_key, solution_id) elif cmd == 'adccli/login': r = adccli.login(self.config['adccli']['url'], self.config['adccli']['username'], self.config['adccli']['password']) return {'status': r} elif cmd == 'adccli/logout': r = adccli.logout() return {'status': r} elif cmd == 'adccli/whoami': r = adccli.whoami().strip() return {'status': r, 'is_logged_in': r == self.config['adccli']['username']} elif cmd == 'adccli/download-problem': out_abspath = os.path.abspath(os.path.dirname(self.config['problem_path'])) r = adccli.get_q_all(out_abspath) self._load_problems(self.config['problem_path']) return {'status': r} elif cmd == 'adccli/upload-solution': problem_key = params['problem'] problem = self.problems[problem_key] if problem.best_solution is None: res = {'status': "Required to 'save' before submission"} else: solution_path = f"{self.config['solution_path']}/submit/{problem_key}.txt" qnumber = problem_key.replace("Q", "").replace(".txt", "") solution_abspath = os.path.abspath(solution_path) r = adccli.put_a(qnumber, solution_abspath) mes = r + "\n" best_solution_key = problem.best_solution best_solution = problem.get_solution(best_solution_key) cputime = f'{best_solution.elapsed_time:.3f}' info_mes = f'Solver: {best_solution.worker}, RID: {best_solution.request_id}' r = adccli.put_a_info(qnumber, cputime, "0", info_mes) mes += r res = {'status': mes} return res else: return None class WorkerManager(object): def __init__(self, host_address): self.workers = dict() self.host = host_address # self.heartbeat_thread = threading.Thread(name='heartbeat', target=self.heartbeat, daemon=True) # self.heartbeat_thread.start() # def heartbeat(self): # while True: # for k, v in self.workers.items(): # v.update_status() # time.sleep(1) def add_worker(self, conf): worker_conf = dict() worker_conf.update(conf) worker_conf['host'] = self.host worker_address = worker_conf['address'] self.workers[worker_address] = Worker(worker_conf) def get_workers(self): return self.workers def update_status(self, address, status): self.workers[address].status = status def reset_worker(self, address): self.workers[address].configure() def request_stop(self, address=None): if address is None: self.broadcast('stop', {}) else: self.workers[address].post('stop', {}) def request_cancel(self, params): self.broadcast('cancel', params) def broadcast(self, cmd, params, solvers=None): threads = [] def _sender(_worker, _cmd, _params): _worker.post(_cmd, _params) for k, v in self.workers.items(): if (not solvers is None) and (not k in solvers): continue _th = threading.Thread(name=v.address, target=_sender, args=(v, cmd, params), daemon=True) _th.start() threads.append(_th) class Worker(object): def __init__(self, params): self.address = params['address'] self.name = params['name'] self.host = params['host'] self.role = params['role'] if not 'partial_mode' in params: self.partial_mode = False else: self.partial_mode = params['partial_mode'] self.params = params self.status = 'Ready' self.configure() def update_status(self): res = self.post('status', None) if res is None: self.status = 'Not connected' else: self.status = res.json()['status'] def post(self, path, data): try: response = requests.post( f'http://{self.address}/api/{path}', json.dumps(data), headers={'Content-Type': 'application/json'}, timeout=2) print(f"I: Post to {self.address}, API Cmd: {path}") except Exception as e: print(f"W: Failed to connect {self.address}") self.status = 'Not connected' response = None return response def configure(self): r = self.post('role', self.params) class Request(object): def __init__(self, worker_manager, data, worker, timeout=10000, merge_solvers=list()): self.data = data self.timeout = timeout self.request_id = time.time() self.request_time = None self.start_time = None self.done_time = None self.solver = worker self.merge_solvers = merge_solvers self.response = list() if self.solver.partial_mode: self.part_nums = len(data['group_problems']) self.solved = [0 for v in range(self.part_nums + 1)] # self.received = [0 for v in range(self.part_nums)] else: self.part_nums = 1 self.solved = [0] # self.received = [] self.is_processed = False @property def request_data(self): data = self.data data['request_id'] = self.request_id data['timeout'] = self.timeout data['merge_solvers'] = self.merge_solvers return data # @property # def is_receive_completed(self): # return all([v>0 for v in self.received]) @property def is_receive_completed(self): return all([v!=0 for v in self.solved[:-1]]) def get_id(self): return self.request_id def get_dict(self): return self.request_data def set_running(self): self.is_processed = True if self.start_time is None: self.start_time = time.time() def cancel(self): if self.solved[-1] == 0: self.solved[-1] = -1 def store_failed(self, data): self.response.append(data) if 'part_id' in data: idx = data['part_id'] self.solved[idx] = -1 if all([v!=0 for v in self.solved[:-1]]) and any([v<0 for v in self.solved[:-1]]): self.solved[-1] = -1 self.done_time = time.time() else: self.solved[-1] = -1 self.done_time = time.time() def store_response(self, data): self.response.append(data) print(data) if 'part_id' in data: idx = data['part_id'] print(idx) if data['status'] == 'done': self.solved[idx] = 1 else: self.solved[idx] = -1 if all([v!=0 for v in self.solved[:-1]]) and any([v<0 for v in self.solved[:-1]]): self.solved[-1] = -1 self.done_time = time.time() else: if data['status'] == 'done': self.solved[-1] = 1 else: self.solved[-1] = -1 self.done_time = time.time() def get_status(self): status = '' if self.solved[-1] == 1: if self.start_time is None: self.start_time = time.time() elapsed_time = self.done_time - self.start_time et_minutes = int(elapsed_time // 60) et_seconds = int(elapsed_time % 60) status = f'done ({self.solved[-1]}) [{et_minutes}:{et_seconds:02}]' elif self.solved[-1] == -1: if self.start_time is None: self.start_time = time.time() counter = sum([v!=0 for v in self.solved]) fail_counter = sum([v<0 for v in self.solved]) if self.done_time is None: status = f'canceled ({counter}/{len(self.solved)} done)' else: elapsed_time = self.done_time - self.start_time et_minutes = int(elapsed_time // 60) et_seconds = int(elapsed_time % 60) status = f'failed ({counter}/{len(self.solved)} done) [{et_minutes}:{et_seconds:02}]' else: if self.solver.status == 'Not connected': status = 'Not connected' else: if self.is_processed: if len(self.solved) == 1: status_mes = 'Running' elif len(self.solved) > 1: counter = sum([v!=0 for v in self.solved]) fail_counter = sum([v<0 for v in self.solved]) if fail_counter > 0: status_mes = f'{counter}/{len(self.solved)}' else: status_mes = f'{counter}/{len(self.solved)}' if self.start_time is None: self.start_time = time.time() elapsed_time = time.time() - self.start_time et_minutes = int(elapsed_time // 60) et_seconds = int(elapsed_time % 60) elapsed_time_str = f'{et_minutes}:{et_seconds:02}' status = f'{status_mes} [{elapsed_time_str}]' else: status = 'Queued' return { 'status': status, 'worker': self.solver.address } def broadcast(self): def _sender(): self.solver.post('solve', self.request_data) _th = threading.Thread(name=f'request_sender_{self.solver.address}', target=_sender, daemon=True) _th.start() self.request_time = time.time()