import importlib import json import os import requests import sys import time import threading import traceback from collections import OrderedDict from utils import Problem class Solver(object): def __init__(self, config): self.type = 'solver' self.host = config['host'] self.address = config['address'] self.name = config['name'] self.partial_mode = config['partial_mode'] self.solver = importlib.import_module(f"solvers.{config['solver']}") self.queue = OrderedDict() self.solving = None self.status = None self.thread = threading.Thread(name='solver', target=self.solver_thread, daemon=True) self.thread.start() self.set_status('Ready') def __del__(self): self.stop_solver() def __repr__(self): return "Solver" def set_status(self, status, params=dict()): self.status = status self.post('worker/status', {'address': self.address, 'status': self.status, 'params': params}) def solver_thread(self): while True: if len(self.queue) > 0: print("I: Solver started") _, params = self.queue.popitem(last=False) self.solving = params self.set_status(f'Running ({len(self.queue)} in queue)', params={'request_id': params['request_id']}) try: self.solve(params) except Exception as e: print("E: An error has occurred in the solver thread") print(traceback.format_exc()) self.solving = None self.set_status('Ready') else: time.sleep(0.5) def solve(self, params): if self.partial_mode and len(params['group_problems']) > 1: for i, (gproblem, gline_map, gblock_map) in enumerate(params['group_problems']): data = params.copy() data['problem'] = gproblem start_time = time.time() solution = self.solver.solve(data) end_time = time.time() elapsed_time = end_time - start_time if not 'elapsed_time' in solution: solution['elapsed_time'] = elapsed_time solution['part_id'] = i solution['line_map'] = gline_map solution['block_map'] = gblock_map self.submit_solution(data, solution) if self.solver.stop_flag: break else: start_time = time.time() solution = self.solver.solve(params) end_time = time.time() elapsed_time = end_time - start_time if not 'elapsed_time' in solution: solution['elapsed_time'] = elapsed_time self.submit_solution(params, solution) return True def post(self, path, data): try: response = requests.post( f'http://{self.host}/api/{path}', json.dumps(data), headers={'Content-Type': 'application/json'}) print(f"I: Post to {self.host}, API Cmd: {path}") except Exception as e: print(f"W: Failed to connect to the host") response = None return response def submit_solution(self, params, solution): data = { 'request_id': params['request_id'], 'problem': params['name'], 'worker': self.address, 'merge_solvers': params['merge_solvers'] } data.update(solution) self.post('problem/solution', data) def start_solver(self, params): print("I: Problem queued") _id = params['request_id'] self.queue[_id] = params self.set_status(f'Running ({len(self.queue)} in queue)') return {'status': self.status} def stop_solver(self): # if self.thread is not None: # self.solver.stop() self.solver.stop() return {'status': self.status} def cancel_queue(self, params): if 'request_id' in params: request_id = params['request'] for k, v in self.queue.items(): if k == request_id: self.queue.pop(k, None) if self.solving is not None and self.solving['request_id'] == request_id: self.solver.stop() elif 'problem' in params: problem_name = params['problem'] for k, v in self.queue.items(): if v['name'] == problem_name: self.queue.pop(k, None) if self.solving is not None and self.solving['name'] == problem_name: self.solver.stop() return {'status': 'canceled'} def call_api(self, method, cmd, params): if cmd == 'role': return {'role': self.type} elif cmd == 'solve': return self.start_solver(params) elif cmd == 'stop': return self.stop_solver() elif cmd == 'cancel': return self.cancel_queue(params) elif cmd == 'status': return {'status': self.status} else: return None