host.py 18.3 KB
Newer Older
Kento HASEGAWA's avatar
Kento HASEGAWA committed
1 2
import glob
import json
3
import os
4 5 6
import sys
import threading
import time
Kento HASEGAWA's avatar
Kento HASEGAWA committed
7 8
import requests

9
from utils import Problem, GroupPart
10
import utils.adcclilib as adccli
11
# import adc2019.client.adcclient
Kento HASEGAWA's avatar
Kento HASEGAWA committed
12 13 14 15 16 17 18

class Host(object):

    def __init__(self, config):
        self.type = 'host'

        self.config = config
19
        self.problems = dict()
20 21
        self.worker_manager = None
        self.request = dict()
22
        self.problem_to_request = dict()
Kento HASEGAWA's avatar
Kento HASEGAWA committed
23

24
        self._load_problems(config['problem_path'])
25
        self._setup_workers()
Kento HASEGAWA's avatar
Kento HASEGAWA committed
26 27 28 29
    
    def __repr__(self):
        return "Host"

30
    def _load_problems(self, path):
Kento HASEGAWA's avatar
Kento HASEGAWA committed
31

32
        problems_path = glob.glob(path)
Kento HASEGAWA's avatar
Kento HASEGAWA committed
33

34
        problems_path = sorted(problems_path)
Kento HASEGAWA's avatar
Kento HASEGAWA committed
35

36
        for v in problems_path:
Kento HASEGAWA's avatar
Kento HASEGAWA committed
37

38
            problem = Problem(v, self.config['solution_path'])
39
            _key = problem.name
Kento HASEGAWA's avatar
Kento HASEGAWA committed
40

41
            self.problems[_key] = problem
42
            self.problem_to_request[_key] = list()
Kento HASEGAWA's avatar
Kento HASEGAWA committed
43
    
44
    def _setup_workers(self):
Kento HASEGAWA's avatar
Kento HASEGAWA committed
45

46
        self.worker_manager = WorkerManager(self.config['address'])
Kento HASEGAWA's avatar
Kento HASEGAWA committed
47
        for v in self.config['worker']:
48 49 50 51 52
            self.worker_manager.add_worker(v)
    
    def get_workers(self):
        return self.worker_manager.get_workers()

53
    def distribute_problem(self, problem_key, solvers=None):
54

55 56
        problem = self.get_problem(problem_key)
        if problem is None:
57 58
            return {'status': 'key error'}
        else:
59
            # request = Request(self.worker_manager, problem.get_dict(), solvers=solvers)
60 61 62 63 64 65 66 67 68 69
            merge_solvers = list()
            # if solvers is None:
            #     _solvers = self.worker_manager.get_workers()
            # else:
            _solvers = dict()
            for k, v in self.worker_manager.get_workers().items():
                if (solvers is None) or (k in solvers):
                    if v.role == 'merge_solver':
                        merge_solvers.append(k)
                    else:
70
                        _solvers[k] = v
71

72
            for _, v in _solvers.items():
73
                request = Request(self.worker_manager, problem.get_dict(), v, merge_solvers=merge_solvers)
74 75 76 77 78
                request.broadcast()

                request_id = request.get_id()
                self.request[request_id] = request
                self.problem_to_request[problem_key].append(request_id)
79 80 81 82

            return {
                'status': 'processed',
                }
Kento HASEGAWA's avatar
Kento HASEGAWA committed
83

84 85
    def get_problems(self):
        return self.problems
Kento HASEGAWA's avatar
Kento HASEGAWA committed
86
    
87 88 89
    def get_problem(self, _key):
        if _key in self.problems:
            return self.problems[_key]
90 91 92 93
        else:
            return None
    
    def store_solution(self, solution):
94 95
        
        if 'part_id' in solution:
96 97
            request_id = solution['request_id']

98
            if solution['status'] != 'done':
99 100
                if request_id in self.request:
                    self.request[request_id].store_failed(solution)
101 102
                return {'status': 'ignored'}
            problem_key = solution['problem']
103 104 105 106
            
            request_id = solution['request_id']
            if request_id in self.request:
                self.request[request_id].store_response(solution)
107

108
            if problem_key in self.problems:
109
                parts_valid = self.problems[problem_key].put_partial_solution(solution)
110
            else:
111
                parts_valid = False
112
            
113
            print(solution['solution'])
114 115
            print(solution['line_map'], solution['block_map'])

116
            if parts_valid and self.request[request_id].is_receive_completed:
117 118 119 120 121
                merge_problem = self.problems[problem_key].partial_merge_problem
                # print(merge_problem)
                merge_problem['request_id'] = solution['request_id']
                merge_problem['timeout'] = 10000
                for k, v in self.worker_manager.workers.items():
122 123 124 125
                    if ('merge_solvers' in solution) and (len(solution['merge_solvers']) > 0):
                        if k in solution['merge_solvers']:
                            v.post('solve', merge_problem)
                    elif v.role == 'merge_solver':
126 127 128
                        v.post('solve', merge_problem)
                return {'status': 'done'}

129
        else:
130 131 132 133 134 135 136 137 138 139 140 141 142 143
            request_id = solution['request_id']
            if request_id in self.request:
                self.request[request_id].store_response(solution)
            else:
                print(f'W: Unknown request_id: {request_id}')

            problem_key = solution['problem']

            if problem_key in self.problems:
                self.problems[problem_key].put_solution(solution)
                print(solution['solution'])
                return {'status': 'registered'}
            else:
                return {'status': 'error'}
144
    
145 146 147 148 149 150 151 152 153 154
    def save_solution(self, problem_key):

        problem = self.get_problem(problem_key)

        if problem is not None:
            problem.save_best_solution()
            return {'status': 'saved'}
        else:
            return {'status': 'failed'}
    
155 156 157 158 159
    def get_request_status(self, request_id):
        if request_id in self.request:
            return self.request[request_id].get_status()
        else:
            return {'status': 'unknown request'}
Kento HASEGAWA's avatar
Kento HASEGAWA committed
160
    
161 162 163 164 165 166 167 168 169 170
    def get_request_by_problem(self, problem_key):
        if problem_key in self.problem_to_request:
            request_statuses = list()
            for rid in self.problem_to_request[problem_key]:
                r = self.request[rid]
                request_statuses.append(r.get_status())
            return reversed(request_statuses)
        else:
            return None
    
171
    def get_viewer_data(self, problem_key, solution_id):
Kento HASEGAWA's avatar
Kento HASEGAWA committed
172 173 174 175 176
        problem = self.get_problem(problem_key)
        if problem is None:
            return None
        problem_data = problem.get_d3json()

177 178 179 180 181
        if solution_id is None:
            solution_data = None
        else:
            solution = problem.get_solution(solution_id)
            solution_data = solution.get_d3json()
Kento HASEGAWA's avatar
Kento HASEGAWA committed
182 183 184 185 186
        
        return {
            'problem': problem_data,
            'solution': solution_data
        }
187

Kento HASEGAWA's avatar
Kento HASEGAWA committed
188 189
    def call_api(self, method, cmd, params):
        if cmd == 'role':
190
            # サーバの役割確認
Kento HASEGAWA's avatar
Kento HASEGAWA committed
191
            return {'role': self.type}
192 193 194
        elif cmd == 'problem/solve':
            # params['problem']に指定された問題をworkerに配信
            problem_key = params['problem']
195 196 197 198 199
            if 'solvers' in params:
                solvers = params['solvers']
            else:
                solvers = None
            return self.distribute_problem(problem_key, solvers)
200
        elif cmd == 'problem/solution':
201 202
            self.store_solution(params)
            return {'status': 'received'}
203 204 205
        elif cmd == 'problem/save':
            problem_key = params['problem']
            return self.save_solution(problem_key)
206 207 208
        elif cmd == 'request/status':
            request_id = float(params['request_id'])
            return self.get_request_status(request_id)
209 210 211
        elif cmd == 'stop':
            self.worker_manager.request_stop()
            return {}
212 213
        elif cmd == 'cancel':
            self.worker_manager.request_cancel(params)
214 215 216
            _requests = self.problem_to_request[params['problem']]
            for r in _requests:
                self.request[r].cancel()
217
            return {'status': 'canceled'}
218 219 220 221 222 223
        elif cmd == 'worker/reset':
            self.worker_manager.reset_worker(params['address'])
            return {'status': 'reset'}
        elif cmd == 'worker/stop':
            self.worker_manager.request_stop(params['address'])
            return {'status': 'stopped'}
Kento HASEGAWA's avatar
Kento HASEGAWA committed
224 225
        elif cmd == 'worker/status':
            self.worker_manager.update_status(params['address'], params['status'])
226 227 228
            if 'request_id' in params['params']:
                request_id = params['params']['request_id']
                self.request[request_id].set_running()
Kento HASEGAWA's avatar
Kento HASEGAWA committed
229
            return {'status': 'updated'}
230
        elif cmd == 'view':
Kento HASEGAWA's avatar
Kento HASEGAWA committed
231 232
            problem_key = params['problem']
            solution_id = params['solution']
233
            return self.get_viewer_data(problem_key, solution_id)
234
        elif cmd == 'adccli/login':
235 236
            r = adccli.login(self.config['adccli']['url'], self.config['adccli']['username'], self.config['adccli']['password'])
            return {'status': r}
237 238
        elif cmd == 'adccli/logout':
            r = adccli.logout()
239
            return {'status': r}
240
        elif cmd == 'adccli/whoami':
241 242
            r = adccli.whoami().strip()
            return {'status': r, 'is_logged_in': r == self.config['adccli']['username']}
243
        elif cmd == 'adccli/download-problem':
244
            out_abspath = os.path.abspath(os.path.dirname(self.config['problem_path']))
245 246
            r = adccli.get_q_all(out_abspath)
            self._load_problems(self.config['problem_path'])
247
            return {'status': r}
248
        elif cmd == 'adccli/upload-solution':
249 250
            problem_key = params['problem']
            problem = self.problems[problem_key]
251
            if problem.best_solution is None:
252
                res = {'status': "Required to 'save' before submission"}
253
            else:
254 255
                solution_path = f"{self.config['solution_path']}/submit/{problem_key}.txt"
                qnumber = problem_key.replace("Q", "").replace(".txt", "")
256 257 258 259 260 261

                solution_abspath = os.path.abspath(solution_path)

                r = adccli.put_a(qnumber, solution_abspath)
                mes = r + "\n"

262
                best_solution_key = problem.best_solution
263 264 265 266 267 268 269
                best_solution = problem.get_solution(best_solution_key)
                cputime = f'{best_solution.elapsed_time:.3f}'
                info_mes = f'Solver: {best_solution.worker}, RID: {best_solution.request_id}'
                r = adccli.put_a_info(qnumber, cputime, "0", info_mes)
                mes += r
                res = {'status': mes}
            return res
Kento HASEGAWA's avatar
Kento HASEGAWA committed
270 271 272
        else:
            return None

273 274 275 276 277
class WorkerManager(object):

    def __init__(self, host_address):
        self.workers = dict()
        self.host = host_address
Kento HASEGAWA's avatar
Kento HASEGAWA committed
278 279 280

    #     self.heartbeat_thread = threading.Thread(name='heartbeat', target=self.heartbeat, daemon=True)
    #     self.heartbeat_thread.start()
281
    
Kento HASEGAWA's avatar
Kento HASEGAWA committed
282 283 284 285 286 287 288
    # def heartbeat(self):

    #     while True:
    #         for k, v in self.workers.items():
    #             v.update_status()
    #         time.sleep(1)

289 290 291 292 293 294 295 296 297 298 299
    def add_worker(self, conf):

        worker_conf = dict()
        worker_conf.update(conf)
        worker_conf['host'] = self.host

        worker_address = worker_conf['address']
        self.workers[worker_address] = Worker(worker_conf)
    
    def get_workers(self):
        return self.workers
Kento HASEGAWA's avatar
Kento HASEGAWA committed
300 301 302
    
    def update_status(self, address, status):
        self.workers[address].status = status
303 304 305
        
    def reset_worker(self, address):
        self.workers[address].configure()
306

307 308 309 310 311
    def request_stop(self, address=None):
        if address is None:
            self.broadcast('stop', {})
        else:
            self.workers[address].post('stop', {})
312 313 314
    
    def request_cancel(self, params):
        self.broadcast('cancel', params)
315

316
    def broadcast(self, cmd, params, solvers=None):
317 318 319 320 321 322 323

        threads = []

        def _sender(_worker, _cmd, _params):
            _worker.post(_cmd, _params)

        for k, v in self.workers.items():
324 325
            if (not solvers is None) and (not k in solvers):
                continue
326 327 328 329
            _th = threading.Thread(name=v.address, target=_sender, args=(v, cmd, params), daemon=True)
            _th.start()
            threads.append(_th)

Kento HASEGAWA's avatar
Kento HASEGAWA committed
330 331 332 333 334
class Worker(object):

    def __init__(self, params):
        self.address = params['address']
        self.name = params['name']
335
        self.host = params['host']
336
        self.role = params['role']
337 338 339 340 341
        
        if not 'partial_mode' in params:
            self.partial_mode = False
        else:
            self.partial_mode = params['partial_mode']
342
        self.params = params
343
        self.status = 'Ready'
Kento HASEGAWA's avatar
Kento HASEGAWA committed
344

345
        self.configure()
Kento HASEGAWA's avatar
Kento HASEGAWA committed
346
    
Kento HASEGAWA's avatar
Kento HASEGAWA committed
347 348 349 350 351 352 353
    def update_status(self):
        res = self.post('status', None)
        if res is None:
            self.status = 'Not connected'
        else:
            self.status = res.json()['status']
    
Kento HASEGAWA's avatar
Kento HASEGAWA committed
354
    def post(self, path, data):
355 356 357 358 359 360 361 362 363 364 365
        try:
            response = requests.post(
                f'http://{self.address}/api/{path}',
                json.dumps(data),
                headers={'Content-Type': 'application/json'},
                timeout=2)
            print(f"I: Post to {self.address}, API Cmd: {path}")
        except Exception as e:
            print(f"W: Failed to connect {self.address}")
            self.status = 'Not connected'
            response = None
Kento HASEGAWA's avatar
Kento HASEGAWA committed
366 367
        return response
    
368
    def configure(self):
369 370 371 372
        r = self.post('role', self.params)

class Request(object):

373
    def __init__(self, worker_manager, data, worker, timeout=10000, merge_solvers=list()):
374 375 376 377 378

        self.data = data
        self.timeout = timeout
    
        self.request_id = time.time()
379
        self.request_time = None
380
        self.start_time = None
381
        self.done_time = None
382

383
        self.solver = worker
384
        self.merge_solvers = merge_solvers
385

386 387 388 389 390
        self.response = list()

        if self.solver.partial_mode:
            self.part_nums = len(data['group_problems'])
            self.solved = [0 for v in range(self.part_nums + 1)]
391
            # self.received = [0 for v in range(self.part_nums)]
392 393 394
        else:
            self.part_nums = 1
            self.solved = [0]
395
            # self.received = []
396 397
        
        self.is_processed = False
398 399 400 401 402 403

    @property
    def request_data(self):
        data = self.data
        data['request_id'] = self.request_id
        data['timeout'] = self.timeout
404
        data['merge_solvers'] = self.merge_solvers
405
        return data
406 407 408 409 410 411 412 413
    
    # @property
    # def is_receive_completed(self):
    #     return all([v>0 for v in self.received])
    
    @property
    def is_receive_completed(self):
        return all([v!=0 for v in self.solved[:-1]])
414 415 416 417 418 419 420

    def get_id(self):
        return self.request_id

    def get_dict(self):
        return self.request_data
    
421 422
    def set_running(self):
        self.is_processed = True
423 424
        if self.start_time is None:
            self.start_time = time.time()
425
    
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
    def cancel(self):
        if self.solved[-1] == 0:
            self.solved[-1] = -1
    
    def store_failed(self, data):

        self.response.append(data)
        if 'part_id' in data:
            idx = data['part_id']
            self.solved[idx] = -1
            if all([v!=0 for v in self.solved[:-1]]) and any([v<0 for v in self.solved[:-1]]):
                self.solved[-1] = -1
                self.done_time = time.time()
        else:
            self.solved[-1] = -1
            self.done_time = time.time()

443
    def store_response(self, data):
444

445 446 447 448 449 450
        self.response.append(data)
        print(data)
        if 'part_id' in data:
            idx = data['part_id']
            print(idx)
            if data['status'] == 'done':
451 452 453
                self.solved[idx] = 1
            else:
                self.solved[idx] = -1
454
            if all([v!=0 for v in self.solved[:-1]]) and any([v<0 for v in self.solved[:-1]]):
455
                self.solved[-1] = -1
456
                self.done_time = time.time()
457
        else:
458 459 460 461
            if data['status'] == 'done':
                self.solved[-1] = 1
            else:
                self.solved[-1] = -1
462
            self.done_time = time.time()
463 464

    def get_status(self):
465

466
        status = ''
467 468

        if self.solved[-1] == 1:
469 470 471
            if self.start_time is None:
                self.start_time = time.time()
            elapsed_time = self.done_time - self.start_time
472 473
            et_minutes = int(elapsed_time // 60)
            et_seconds = int(elapsed_time % 60)
474 475
            status = f'done ({self.solved[-1]}) [{et_minutes}:{et_seconds:02}]'
        elif self.solved[-1] == -1:
476 477
            if self.start_time is None:
                self.start_time = time.time()
478 479 480 481 482 483 484 485 486
            counter = sum([v!=0 for v in self.solved])
            fail_counter = sum([v<0 for v in self.solved])
            if self.done_time is None:
                status = f'canceled ({counter}<!{fail_counter}>/{len(self.solved)} done)'
            else:
                elapsed_time = self.done_time - self.start_time
                et_minutes = int(elapsed_time // 60)
                et_seconds = int(elapsed_time % 60)
                status = f'failed ({counter}<!{fail_counter}>/{len(self.solved)} done) [{et_minutes}:{et_seconds:02}]'
487
        else:
488 489
            if self.solver.status == 'Not connected':
                status = 'Not connected'
490
            else:
491 492 493 494
                if self.is_processed:
                    if len(self.solved) == 1:
                        status_mes = 'Running'
                    elif len(self.solved) > 1:
495 496 497 498 499 500
                        counter = sum([v!=0 for v in self.solved])
                        fail_counter = sum([v<0 for v in self.solved])
                        if fail_counter > 0:
                            status_mes = f'{counter}<!{fail_counter}>/{len(self.solved)}'
                        else:
                            status_mes = f'{counter}/{len(self.solved)}'
501 502 503 504 505 506 507 508 509 510 511

                    if self.start_time is None:
                        self.start_time = time.time()
                    elapsed_time = time.time() - self.start_time
                    et_minutes = int(elapsed_time // 60)
                    et_seconds = int(elapsed_time % 60)
                    elapsed_time_str = f'{et_minutes}:{et_seconds:02}'

                    status = f'{status_mes} [{elapsed_time_str}]'
                else:
                    status = 'Queued'
512 513 514

        return {
            'status': status,
515 516
            'worker': self.solver.address
        }
517 518

    def broadcast(self):
519 520 521 522 523 524 525 526
        
        def _sender():
            self.solver.post('solve', self.request_data)

        _th = threading.Thread(name=f'request_sender_{self.solver.address}', target=_sender, daemon=True)
        _th.start()

        self.request_time = time.time()