diff --git a/main.py b/main.py index 2582dac5faac7afcba3bfd54bcf8a2cfd603e014..89ee48924c538b192f514f0beebee60741185bad 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import argparse from flask import abort, Flask, g, jsonify, render_template, request # from gevent import pywsgi, monkey # from geventwebsocket.handler import WebSocketHandler -from queue import Queue +# from queue import Queue import adc2019system diff --git a/roles/host.py b/roles/host.py index 3c1840cf51ef52f49c56eb5008d764c75d2de80b..5bd84f79aca23fdb1797b05c78a6e914b9e40054 100644 --- a/roles/host.py +++ b/roles/host.py @@ -147,6 +147,9 @@ class Host(object): elif cmd == 'stop': self.worker_manager.request_stop() return {} + elif cmd == 'worker/status': + self.worker_manager.update_status(params['address'], params['status']) + return {'status': 'updated'} elif cmd == 'view': problem_key = params['problem'] solution_id = params['solution'] @@ -201,7 +204,17 @@ class WorkerManager(object): def __init__(self, host_address): self.workers = dict() self.host = host_address + + # self.heartbeat_thread = threading.Thread(name='heartbeat', target=self.heartbeat, daemon=True) + # self.heartbeat_thread.start() + # def heartbeat(self): + + # while True: + # for k, v in self.workers.items(): + # v.update_status() + # time.sleep(1) + def add_worker(self, conf): worker_conf = dict() @@ -213,6 +226,9 @@ class WorkerManager(object): def get_workers(self): return self.workers + + def update_status(self, address, status): + self.workers[address].status = status def request_stop(self): self.broadcast('stop', {}) @@ -237,10 +253,17 @@ class Worker(object): self.host = params['host'] self.role = params['role'] self.params = params - self.status = 'Ready' + self.status = 'Not ready' self.configure() + def update_status(self): + res = self.post('status', None) + if res is None: + self.status = 'Not connected' + else: + self.status = res.json()['status'] + def post(self, path, data): try: response = requests.post( @@ -250,7 +273,6 @@ class Worker(object): timeout=2) print(f"I: Post to {self.address}, API Cmd: {path}") except Exception as e: - # sys.stderr.write(str(e) + "\n") print(f"W: Failed to connect {self.address}") self.status = 'Not connected' response = None diff --git a/roles/solver.py b/roles/solver.py index 890c67cc549b4856402232fd387e6b949b82f430..69427077f56013b9f3e942dc3d6a88528a530a28 100644 --- a/roles/solver.py +++ b/roles/solver.py @@ -6,6 +6,8 @@ import sys import time import threading +from collections import OrderedDict + class Solver(object): def __init__(self, config): @@ -15,7 +17,15 @@ class Solver(object): self.address = config['address'] self.name = config['name'] self.solver = importlib.import_module(f"solvers.{config['solver']}") - self.thread = None + self.queue = OrderedDict() + + self.status = None + + # self.thread = None + self.thread = threading.Thread(name='solver', target=self.solver_thread, daemon=True) + self.thread.start() + + self.set_status('Ready') def __del__(self): self.stop_solver() @@ -23,6 +33,26 @@ class Solver(object): def __repr__(self): return "Solver" + def set_status(self, status): + self.status = status + self.post('worker/status', {'address': self.address, 'status': self.status}) + + def solver_thread(self): + + while True: + if len(self.queue) > 0: + print("I: Solver started") + _, params = self.queue.popitem(last=False) + # self.status = f'Running ({len(self.queue)} in queue)' + self.set_status(f'Running ({len(self.queue)} in queue)') + self.solve(params) + # self.status = 'Ready' + self.set_status('Ready') + else: + # self.status = 'Ready' + self.set_status('Ready') + time.sleep(0.5) + def solve(self, params): start_time = time.time() @@ -35,15 +65,19 @@ class Solver(object): solution['elapsed_time'] = elapsed_time self.submit_solution(params, solution) - self.thread = None + # self.thread = None return True def post(self, path, data): - response = requests.post( - f'http://{self.host}/api/{path}', - json.dumps(data), - headers={'Content-Type': 'application/json'}) - print(f"I: Post to {self.host}, API Cmd: {path}") + try: + response = requests.post( + f'http://{self.host}/api/{path}', + json.dumps(data), + headers={'Content-Type': 'application/json'}) + print(f"I: Post to {self.host}, API Cmd: {path}") + except Exception as e: + print(f"W: Failed to connect to the host") + response = None return response def submit_solution(self, params, solution): @@ -57,21 +91,19 @@ class Solver(object): def start_solver(self, params): - if self.thread is None: - print("I: Solver started") - # self.thread = StoppableThread(target=self.solve, args=(params, )) - self.thread = threading.Thread(name='solver', target=self.solve, args=(params, ), daemon=True) - self.thread.start() - return {'status': 'started'} - else: - return {'status': 'busy'} + print("I: Problem queued") + _id = params['request_id'] + self.queue[_id] = params + # self.status = f'Running ({len(self.queue)} in queue)' + self.set_status(f'Running ({len(self.queue)} in queue)') + return {'status': self.status} def stop_solver(self): if self.thread is not None: self.solver.stop() - return {'status': 'stopped'} + return {'status': self.status} def call_api(self, method, cmd, params): if cmd == 'role': @@ -80,5 +112,7 @@ class Solver(object): return self.start_solver(params) elif cmd == 'stop': return self.stop_solver() + elif cmd == 'status': + return {'status': self.status} else: return None