import glob import json import sys import threading import time import requests from utils import Problem import utils.adcclilib as adccli class Host(object): def __init__(self, config): self.type = 'host' self.config = config self.problems = dict() self.worker_manager = None self.request = dict() self.processing_request = None self._load_problems(config['problem_path']) self._setup_workers() def __repr__(self): return "Host" def _load_problems(self, path): problems_path = glob.glob(path) problems_path = sorted(problems_path) for v in problems_path: problem = Problem(v, self.config['solution_path']) _key = problem.name self.problems[_key] = problem def _setup_workers(self): self.worker_manager = WorkerManager(self.config['address']) for v in self.config['worker']: self.worker_manager.add_worker(v) def get_workers(self): return self.worker_manager.get_workers() def distribute_problem(self, problem_key, solvers=None): problem = self.get_problem(problem_key) if problem is None: return {'status': 'key error'} else: request = Request(self.worker_manager, problem.get_dict(), solvers=solvers) request_id = request.get_id() self.request[request_id] = request request.broadcast() return { 'status': 'processed', 'request_id': request_id, 'timeout': request.timeout } def get_problems(self): return self.problems def get_problem(self, _key): if _key in self.problems: return self.problems[_key] else: return None def store_solution(self, solution): # request_idをチェックする機能もつくておく request_id = solution['request_id'] if request_id in self.request: self.request[request_id].store_response(solution) else: print(f'W: Unknown request_id: {request_id}') problem_key = solution['problem'] if problem_key in self.problems: self.problems[problem_key].put_solution(solution) print(solution['solution']) return {'status': 'registered'} else: return {'status': 'error'} def save_solution(self, problem_key): problem = self.get_problem(problem_key) if problem is not None: problem.save_best_solution() return {'status': 'saved'} else: return {'status': 'failed'} def get_request_status(self, request_id): if request_id in self.request: return self.request[request_id].get_status() else: return {'status': 'unknown request'} def get_viewer_data(self, problem_key, solution_id): problem = self.get_problem(problem_key) if problem is None: return None problem_data = problem.get_d3json() if solution_id is None: solution_data = None else: solution = problem.get_solution(solution_id) solution_data = solution.get_d3json() return { 'problem': problem_data, 'solution': solution_data } def call_api(self, method, cmd, params): if cmd == 'role': # サーバの役割確認 return {'role': self.type} elif cmd == 'problem/solve': # params['problem']に指定された問題をworkerに配信 problem_key = params['problem'] if 'solvers' in params: solvers = params['solvers'] else: solvers = None return self.distribute_problem(problem_key, solvers) elif cmd == 'problem/solution': self.store_solution(params) return {'status': 'received'} elif cmd == 'problem/save': problem_key = params['problem'] return self.save_solution(problem_key) elif cmd == 'request/status': request_id = float(params['request_id']) return self.get_request_status(request_id) elif cmd == 'stop': self.worker_manager.request_stop() return {} elif cmd == 'view': problem_key = params['problem'] solution_id = params['solution'] return self.get_viewer_data(problem_key, solution_id) elif cmd == 'adccli/login': with open('path-to-adccli-login', 'r') as fp: d = json.load(fp) r = adccli.login(d['url'], d['username'], d['password']) res = {'message': r} return json.dumps(res) elif cmd == 'adccli/logout': r = adccli.logout() res = {'message': r} return json.dumps(res) elif cmd == 'adccli/whoami': with open('path-to-adccli-login', 'r') as fp: d = json.load(fp) r = adccli.whoami() res = {'message': r, 'status': r == d['username']} return json.dumps(res) elif cmd == 'adccli/download-problem': out_abspath = os.path.abspath(self.config['problem_path']) r = adccli.get_q_all(out_abspath) self._load_problems(self.config['problem_path']) res = {'message': r} return json.dumps(res) elif cmd == 'adccli/upload-solution': qname = params['problem'] if not "best_json" in questions[qname]: res = {'message': "Required to 'save' before submission"} else: out_path = "{}/{}".format(self.config['solution_path'], "submit") # qnumber = qname.replace("NL_Q", "").replace(".txt", "") # ans_file_path = "{}/T01_A{}.txt".format(out_path, qnumber) # ans_file_abspath = os.path.abspath(ans_file_path) # r = adccli.put_a(qnumber, ans_file_abspath) # mes = r + "\n" # json_name = questions[qname]['best_json'] # data = questions[qname]['answers'][json_name] # r = adccli.put_a_info(qnumber, data['cputime'], "0", "Test") # mes += r mes = '' res = {'message': mes} return json.dumps(res) else: return None class WorkerManager(object): def __init__(self, host_address): self.workers = dict() self.host = host_address def add_worker(self, conf): worker_conf = dict() worker_conf.update(conf) worker_conf['host'] = self.host worker_address = worker_conf['address'] self.workers[worker_address] = Worker(worker_conf) def get_workers(self): return self.workers def request_stop(self): self.broadcast('stop', {}) def broadcast(self, cmd, params, solvers=None): threads = [] def _sender(_worker, _cmd, _params): _worker.post(_cmd, _params) for k, v in self.workers.items(): _th = threading.Thread(name=v.address, target=_sender, args=(v, cmd, params), daemon=True) _th.start() threads.append(_th) class Worker(object): def __init__(self, params): self.address = params['address'] self.name = params['name'] self.host = params['host'] self.role = params['role'] self.params = params self.status = 'Ready' self.configure() def post(self, path, data): try: response = requests.post( f'http://{self.address}/api/{path}', json.dumps(data), headers={'Content-Type': 'application/json'}, timeout=2) print(f"I: Post to {self.address}, API Cmd: {path}") except Exception as e: # sys.stderr.write(str(e) + "\n") print(f"W: Failed to connect {self.address}") self.status = 'Not connected' response = None return response def configure(self): r = self.post('role', self.params) class Request(object): def __init__(self, worker_manager, data, timeout=10000, solvers=None): self.worker_manager = worker_manager self.data = data self.timeout = timeout self.request_id = time.time() self.broadcast_time = None self.solvers = solvers self.response = dict() @property def request_data(self): data = self.data data['request_id'] = self.request_id data['timeout'] = self.timeout return data def get_id(self): return self.request_id def get_dict(self): return self.request_data def store_response(self, data): # TODO: 1つのrequest_idに対し同一のworkerから2つ以上答えが返ってきた場合の例外処理 worker = data['worker'] self.response[worker] = data def get_status(self): all_workers = self.worker_manager.get_workers().keys() worker_count = 0 response_count = 0 for v in all_workers: worker_count += 1 if v in self.response: response_count += 1 status = '' if worker_count == response_count: status = 'done' # elif time.time() - self.broadcast_time > self.timeout: # status = 'timeout' else: elapsed_time = time.time() - self.broadcast_time et_minutes = int(elapsed_time // 60) et_seconds = int(elapsed_time % 60) # status = 'processing' status = f'{et_minutes}:{et_seconds:02}' if self.broadcast_time is None: progress = 0, else: progress_problem = response_count / worker_count * 100 progress_time = (time.time() - self.broadcast_time) / self.timeout * 100 progress = min(100, max(progress_problem, progress_time)) worker_status = dict() for v in all_workers: if v in self.response: worker_status[v] = self.response[v]['status'] else: if self.worker_manager.workers[v].status == 'Not connected': worker_status[v] = 'Not connected' else: worker_status[v] = 'Waiting for response' return { 'status': status, 'workers': worker_count, 'solutions': response_count, 'progress': progress, 'worker_status': worker_status } def broadcast(self): self.worker_manager.broadcast('solve', self.request_data, solvers=self.solvers) self.broadcast_time = time.time()