diff --git a/main.py b/main.py index 051d0398e6ad139ea14f3a081e69f48a3867fc73..2e6a5f9eb9fe32e99ab9b15f2c59e3d98bfdec88 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import argparse from flask import abort, Flask, g, jsonify, render_template, request # from gevent import pywsgi, monkey # from geventwebsocket.handler import WebSocketHandler -from queue import Queue +# from queue import Queue import adc2019system diff --git a/roles/host.py b/roles/host.py index f0e6b39855d80c2ff96f3ad832d38c465dca94cd..9d1e3e016952b47f6e3354e17035bd67a0397fb0 100644 --- a/roles/host.py +++ b/roles/host.py @@ -146,6 +146,9 @@ class Host(object): elif cmd == 'stop': self.worker_manager.request_stop() return {} + elif cmd == 'worker/status': + self.worker_manager.update_status(params['address'], params['status']) + return {'status': 'updated'} elif cmd == 'view/solution': problem_key = params['problem'] solution_id = params['solution'] @@ -200,7 +203,17 @@ class WorkerManager(object): def __init__(self, host_address): self.workers = dict() self.host = host_address + + # self.heartbeat_thread = threading.Thread(name='heartbeat', target=self.heartbeat, daemon=True) + # self.heartbeat_thread.start() + # def heartbeat(self): + + # while True: + # for k, v in self.workers.items(): + # v.update_status() + # time.sleep(1) + def add_worker(self, conf): worker_conf = dict() @@ -212,6 +225,9 @@ class WorkerManager(object): def get_workers(self): return self.workers + + def update_status(self, address, status): + self.workers[address].status = status def request_stop(self): self.broadcast('stop', {}) @@ -236,10 +252,17 @@ class Worker(object): self.host = params['host'] self.role = params['role'] self.params = params - self.status = 'Ready' + self.status = 'Not ready' self.configure() + def update_status(self): + res = self.post('status', None) + if res is None: + self.status = 'Not connected' + else: + self.status = res.json()['status'] + def post(self, path, data): try: response = requests.post( @@ -249,7 +272,6 @@ class Worker(object): timeout=2) print(f"I: Post to {self.address}, API Cmd: {path}") except Exception as e: - # sys.stderr.write(str(e) + "\n") print(f"W: Failed to connect {self.address}") self.status = 'Not connected' response = None diff --git a/roles/solver.py b/roles/solver.py index 944b70017fa1b8e094533e0da64a9a5098c94067..69427077f56013b9f3e942dc3d6a88528a530a28 100644 --- a/roles/solver.py +++ b/roles/solver.py @@ -19,11 +19,13 @@ class Solver(object): self.solver = importlib.import_module(f"solvers.{config['solver']}") self.queue = OrderedDict() - self.status = 'Ready' + self.status = None # self.thread = None self.thread = threading.Thread(name='solver', target=self.solver_thread, daemon=True) self.thread.start() + + self.set_status('Ready') def __del__(self): self.stop_solver() @@ -31,16 +33,24 @@ class Solver(object): def __repr__(self): return "Solver" + def set_status(self, status): + self.status = status + self.post('worker/status', {'address': self.address, 'status': self.status}) + def solver_thread(self): while True: if len(self.queue) > 0: print("I: Solver started") - self.status = 'running' _, params = self.queue.popitem(last=False) + # self.status = f'Running ({len(self.queue)} in queue)' + self.set_status(f'Running ({len(self.queue)} in queue)') self.solve(params) + # self.status = 'Ready' + self.set_status('Ready') else: - self.status = 'ready' + # self.status = 'Ready' + self.set_status('Ready') time.sleep(0.5) def solve(self, params): @@ -59,11 +69,15 @@ class Solver(object): return True def post(self, path, data): - response = requests.post( - f'http://{self.host}/api/{path}', - json.dumps(data), - headers={'Content-Type': 'application/json'}) - print(f"I: Post to {self.host}, API Cmd: {path}") + try: + response = requests.post( + f'http://{self.host}/api/{path}', + json.dumps(data), + headers={'Content-Type': 'application/json'}) + print(f"I: Post to {self.host}, API Cmd: {path}") + except Exception as e: + print(f"W: Failed to connect to the host") + response = None return response def submit_solution(self, params, solution): @@ -77,25 +91,19 @@ class Solver(object): def start_solver(self, params): - # if self.thread is None: - # print("I: Solver started") - # self.thread = threading.Thread(name='solver', target=self.solve, args=(params, ), daemon=True) - # self.thread.start() - # return {'status': 'started'} - # else: - # return {'status': 'busy'} - print("I: Problem queued") _id = params['request_id'] self.queue[_id] = params - return {'status': 'queued'} + # self.status = f'Running ({len(self.queue)} in queue)' + self.set_status(f'Running ({len(self.queue)} in queue)') + return {'status': self.status} def stop_solver(self): if self.thread is not None: self.solver.stop() - return {'status': 'stopped'} + return {'status': self.status} def call_api(self, method, cmd, params): if cmd == 'role': @@ -105,6 +113,6 @@ class Solver(object): elif cmd == 'stop': return self.stop_solver() elif cmd == 'status': - return self.status + return {'status': self.status} else: return None