Commit ffd55d63 authored by Kento HASEGAWA's avatar Kento HASEGAWA

Merge feature/backend

parents 59b27b52 9234c22e
...@@ -5,7 +5,7 @@ import argparse ...@@ -5,7 +5,7 @@ import argparse
from flask import abort, Flask, g, jsonify, render_template, request from flask import abort, Flask, g, jsonify, render_template, request
# from gevent import pywsgi, monkey # from gevent import pywsgi, monkey
# from geventwebsocket.handler import WebSocketHandler # from geventwebsocket.handler import WebSocketHandler
from queue import Queue # from queue import Queue
import adc2019system import adc2019system
......
...@@ -147,6 +147,9 @@ class Host(object): ...@@ -147,6 +147,9 @@ class Host(object):
elif cmd == 'stop': elif cmd == 'stop':
self.worker_manager.request_stop() self.worker_manager.request_stop()
return {} return {}
elif cmd == 'worker/status':
self.worker_manager.update_status(params['address'], params['status'])
return {'status': 'updated'}
elif cmd == 'view': elif cmd == 'view':
problem_key = params['problem'] problem_key = params['problem']
solution_id = params['solution'] solution_id = params['solution']
...@@ -201,7 +204,17 @@ class WorkerManager(object): ...@@ -201,7 +204,17 @@ class WorkerManager(object):
def __init__(self, host_address): def __init__(self, host_address):
self.workers = dict() self.workers = dict()
self.host = host_address self.host = host_address
# self.heartbeat_thread = threading.Thread(name='heartbeat', target=self.heartbeat, daemon=True)
# self.heartbeat_thread.start()
# def heartbeat(self):
# while True:
# for k, v in self.workers.items():
# v.update_status()
# time.sleep(1)
def add_worker(self, conf): def add_worker(self, conf):
worker_conf = dict() worker_conf = dict()
...@@ -213,6 +226,9 @@ class WorkerManager(object): ...@@ -213,6 +226,9 @@ class WorkerManager(object):
def get_workers(self): def get_workers(self):
return self.workers return self.workers
def update_status(self, address, status):
self.workers[address].status = status
def request_stop(self): def request_stop(self):
self.broadcast('stop', {}) self.broadcast('stop', {})
...@@ -237,10 +253,17 @@ class Worker(object): ...@@ -237,10 +253,17 @@ class Worker(object):
self.host = params['host'] self.host = params['host']
self.role = params['role'] self.role = params['role']
self.params = params self.params = params
self.status = 'Ready' self.status = 'Not ready'
self.configure() self.configure()
def update_status(self):
res = self.post('status', None)
if res is None:
self.status = 'Not connected'
else:
self.status = res.json()['status']
def post(self, path, data): def post(self, path, data):
try: try:
response = requests.post( response = requests.post(
...@@ -250,7 +273,6 @@ class Worker(object): ...@@ -250,7 +273,6 @@ class Worker(object):
timeout=2) timeout=2)
print(f"I: Post to {self.address}, API Cmd: {path}") print(f"I: Post to {self.address}, API Cmd: {path}")
except Exception as e: except Exception as e:
# sys.stderr.write(str(e) + "\n")
print(f"W: Failed to connect {self.address}") print(f"W: Failed to connect {self.address}")
self.status = 'Not connected' self.status = 'Not connected'
response = None response = None
......
...@@ -6,6 +6,8 @@ import sys ...@@ -6,6 +6,8 @@ import sys
import time import time
import threading import threading
from collections import OrderedDict
class Solver(object): class Solver(object):
def __init__(self, config): def __init__(self, config):
...@@ -15,7 +17,15 @@ class Solver(object): ...@@ -15,7 +17,15 @@ class Solver(object):
self.address = config['address'] self.address = config['address']
self.name = config['name'] self.name = config['name']
self.solver = importlib.import_module(f"solvers.{config['solver']}") self.solver = importlib.import_module(f"solvers.{config['solver']}")
self.thread = None self.queue = OrderedDict()
self.status = None
# self.thread = None
self.thread = threading.Thread(name='solver', target=self.solver_thread, daemon=True)
self.thread.start()
self.set_status('Ready')
def __del__(self): def __del__(self):
self.stop_solver() self.stop_solver()
...@@ -23,6 +33,26 @@ class Solver(object): ...@@ -23,6 +33,26 @@ class Solver(object):
def __repr__(self): def __repr__(self):
return "Solver" return "Solver"
def set_status(self, status):
self.status = status
self.post('worker/status', {'address': self.address, 'status': self.status})
def solver_thread(self):
while True:
if len(self.queue) > 0:
print("I: Solver started")
_, params = self.queue.popitem(last=False)
# self.status = f'Running ({len(self.queue)} in queue)'
self.set_status(f'Running ({len(self.queue)} in queue)')
self.solve(params)
# self.status = 'Ready'
self.set_status('Ready')
else:
# self.status = 'Ready'
self.set_status('Ready')
time.sleep(0.5)
def solve(self, params): def solve(self, params):
start_time = time.time() start_time = time.time()
...@@ -35,15 +65,19 @@ class Solver(object): ...@@ -35,15 +65,19 @@ class Solver(object):
solution['elapsed_time'] = elapsed_time solution['elapsed_time'] = elapsed_time
self.submit_solution(params, solution) self.submit_solution(params, solution)
self.thread = None # self.thread = None
return True return True
def post(self, path, data): def post(self, path, data):
response = requests.post( try:
f'http://{self.host}/api/{path}', response = requests.post(
json.dumps(data), f'http://{self.host}/api/{path}',
headers={'Content-Type': 'application/json'}) json.dumps(data),
print(f"I: Post to {self.host}, API Cmd: {path}") headers={'Content-Type': 'application/json'})
print(f"I: Post to {self.host}, API Cmd: {path}")
except Exception as e:
print(f"W: Failed to connect to the host")
response = None
return response return response
def submit_solution(self, params, solution): def submit_solution(self, params, solution):
...@@ -57,21 +91,19 @@ class Solver(object): ...@@ -57,21 +91,19 @@ class Solver(object):
def start_solver(self, params): def start_solver(self, params):
if self.thread is None: print("I: Problem queued")
print("I: Solver started") _id = params['request_id']
# self.thread = StoppableThread(target=self.solve, args=(params, )) self.queue[_id] = params
self.thread = threading.Thread(name='solver', target=self.solve, args=(params, ), daemon=True) # self.status = f'Running ({len(self.queue)} in queue)'
self.thread.start() self.set_status(f'Running ({len(self.queue)} in queue)')
return {'status': 'started'} return {'status': self.status}
else:
return {'status': 'busy'}
def stop_solver(self): def stop_solver(self):
if self.thread is not None: if self.thread is not None:
self.solver.stop() self.solver.stop()
return {'status': 'stopped'} return {'status': self.status}
def call_api(self, method, cmd, params): def call_api(self, method, cmd, params):
if cmd == 'role': if cmd == 'role':
...@@ -80,5 +112,7 @@ class Solver(object): ...@@ -80,5 +112,7 @@ class Solver(object):
return self.start_solver(params) return self.start_solver(params)
elif cmd == 'stop': elif cmd == 'stop':
return self.stop_solver() return self.stop_solver()
elif cmd == 'status':
return {'status': self.status}
else: else:
return None return None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment