import datetime import json import os import queue import re import time import uuid class Problem(object): def __init__(self, problem_path, solution_path=None): self.path = problem_path self.name = '' self.size = (0, 0) self.block_num = 0 self.blocks = dict() self.problem = '' # self.status = 'Ready' self.solutions = dict() self.solution_path = solution_path self.best_solution = None self.line_numbers = list() self.connection = tuple() self.block_groups = list() self._load_problem(problem_path) @property def size_str(self): return f'{self.size[0]}X{self.size[1]}' @property def problem_text(self): def intplus(v): if v.isdecimal(): return int(v) else: return v _text = '' _text += f'SIZE {self.size_str}\n' _text += f'BLOCK_NUM {self.block_num}\n' _text += '\n' for bi, block in self.blocks.items(): _text += f'BLOCK#{bi} {block["w"]}X{block["h"]}\n' for cr in block['cells']: for cci, cc in enumerate(cr): if cci > 0: _text += ',' if isinstance(cc, int) and cc > 0: cc = self.line_numbers.index(cc) _text += f'{cc}' _text += '\n' _text += '\n' return _text @property def status(self): _status = 'None' if self.problem != '': _status = 'Ready' if len(self.solutions) > 0: _solved_count = 0 _trial_count = 0 for k, v in self.solutions.items(): _trial_count += 1 if v.is_valid_solution(): _solved_count += 1 if _solved_count > 0: _status = f'Solved ({_solved_count}/{_trial_count})' else: _status = f'Tried ({_trial_count})' if not self.best_solution is None: _status = f'Saved' return _status def _load_problem(self, path): with open(path, 'r') as fp: q_text = fp.read() q_lines = q_text.splitlines() board_size = [0, 0] block_num = 0 blocks = dict() def intplus(v): if v.isdecimal(): return int(v) else: return v line_number_list = [0] block_to_line = dict() line_to_block = dict() li = 0 while li < len(q_lines): _l = q_lines[li] if "SIZE" in _l: board_size_str = _l.strip().split()[1] board_size = [int(v) for v in board_size_str.split('X')] if 'BLOCK_NUM' in _l: block_num = int(_l.strip().split()[1]) if 'BLOCK#' in _l: p = r'BLOCK#([0-9]+) +([0-9]+)X([0-9]+)' m = re.match(p, _l.strip()) bi = int(m.group(1)) bw = int(m.group(2)) bh = int(m.group(3)) blocks[bi] = { 'index': bi, 'w': bw, 'h': bh, 'cells': list() } for _h in range(bh): li += 1 _l = q_lines[li].strip() _block_row = [] for v in _l.split(','): _line_num = intplus(v.strip()) if isinstance(_line_num, int) and _line_num > 0: # Line number conversion if not _line_num in line_number_list: line_number_list.append(_line_num) # _line_num = line_number_list.index(_line_num) # Make connection list if not bi in block_to_line: block_to_line[bi] = list() block_to_line[bi].append(_line_num) if not _line_num in line_to_block: line_to_block[_line_num] = list() line_to_block[_line_num].append(bi) _block_row.append(_line_num) blocks[bi]['cells'].append(_block_row) # blocks[bi]['cells'].append([intplus(v.strip()) for v in _l.split(',')]) li += 1 name = os.path.splitext(os.path.basename(path))[0] self.size = board_size self.block_num = block_num self.blocks = blocks self.name = name self.problem = q_text self.line_numbers = line_number_list self.connection = (line_to_block, block_to_line) # self.status = 'Ready' self._analyze_block_groups() def _analyze_block_groups(self): traversed_block = list() line_to_block = self.connection[0] block_to_line = self.connection[1] block_groups = list() for b in block_to_line.keys(): if b in traversed_block: continue traverse_block_queue = queue.Queue() traverse_block_queue.put(b) target_group_blocks = list() while not traverse_block_queue.empty(): target_block = traverse_block_queue.get() target_block_lines = block_to_line[target_block] if target_block in traversed_block: continue traversed_block.append(target_block) target_group_blocks.append(target_block) for line in target_block_lines: corresponding_blocks = line_to_block[line] if corresponding_blocks[0] == target_block: next_block = corresponding_blocks[1] else: next_block = corresponding_blocks[0] if next_block in traversed_block: continue else: traverse_block_queue.put(next_block) block_groups.append(target_group_blocks) self.block_groups = block_groups def get_dict(self): return { 'name': self.name, 'size': self.size, 'size_str': self.size_str, 'block_num': self.block_num, 'problem': self.problem, # 'status': self.status } def get_d3json(self): return { 'block': self.blocks, 'w': self.size[0], 'h': self.size[1], 'n': self.block_num } def put_solution(self, data): solution = Solution(data) solution_id = solution.get_id() print(f'I: Put a solution: {solution_id}') self.solutions[solution_id] = solution # Save solution solution.save(self.solution_path) def save_best_solution(self): best_score = None for k, v in self.solutions.items(): if v.is_valid_solution(): _score = v.score if (best_score is None) or (_score < best_score[1]): best_score = (k, _score) if best_score is not None: solution_name = self.name outpath = f'{self.solution_path}/submit' if not os.path.exists(outpath): os.mkdir(outpath) best_solution_key = best_score[0] with open(f'{outpath}/{solution_name}.txt', 'w') as fp: fp.write(self.solutions[best_solution_key].solution) with open(f'{outpath}/{solution_name}.json', 'w') as fp: json.dump(self.solutions[best_solution_key].get_dict(), fp, indent=4) self.best_solution = best_solution_key def get_solutions(self): return self.solutions def get_solution(self, solution_id): if solution_id in self.solutions: return self.solutions[solution_id] else: return None class Solution(object): def __init__(self, data): self.problem = data['problem'] self.request_id = data['request_id'] self.worker = data['worker'] self.elapsed_time = data['elapsed_time'] self.solution = data['solution'] self.status = data['status'] self.size = (None, None) self.map = None self.block = dict() self.timestamp = time.time() self._id = str(uuid.uuid4()) self._parse_solution() def _parse_solution(self): if self.status != 'done': return board_size = [0, 0] block_num = 0 _lines = self.solution.splitlines() li = 0 bw, bh = 0, 0 bmap = list() bposition = dict() state = 0 while li < len(_lines): _l = _lines[li].strip() if (state == 0) and ('SIZE' in _l): board_size_str = _l.strip().split()[1] board_solution_size = [int(v) for v in board_size_str.split('X')] bw = board_solution_size[0] bh = board_solution_size[1] for _h in range(bh): li += 1 _l = _lines[li].strip() bmap.append([int(v.strip()) for v in _l.split(',')]) state = 1 if (state == 1) and ('BLOCK' in _l): p = r'BLOCK#([0-9]+) +@\(([0-9]+), *([0-9]+)\)' m = re.match(p, _l.strip()) bi = int(m.group(1)) bx = int(m.group(2)) by = int(m.group(3)) bposition[bi] = { 'index': bi, 'x': bx, 'y': by, } li += 1 self.size = (bw, bh) self.map = bmap self.block = bposition @property def score(self): if self.is_valid_solution(): return self.size[0] * self.size[1] else: return None @property def size_str(self): if self.is_valid_solution(): return f'{self.size[0]}X{self.size[1]}' else: return '-' def is_valid_solution(self): return self.status == 'done' def get_id(self): return self._id def get_dict(self): return { 'id': self._id, 'timestamp': self.timestamp, 'request_id': self.request_id, 'worker': self.worker, 'elapsed_time': self.elapsed_time, 'problem': self.problem, 'solution': self.solution } def get_d3json(self): if self.status == 'done': return { 'w': self.size[0], 'h': self.size[1], 'map': self.map, 'block': self.block } else: return { 'w': 0, 'h': 0, 'map': [[]], 'block': dict() } def save(self, basedir): outdir = f"{basedir}/{self.problem}" if not os.path.exists(outdir): os.mkdir(outdir) outpath = f"{outdir}/{self.request_id}-{self.worker}.json".replace(":", ".") with open(outpath, 'w') as fp: json.dump(self.get_dict(), fp, indent=4) @property def timestamp_str(self): dt = datetime.datetime.fromtimestamp( self.timestamp, datetime.timezone(datetime.timedelta(hours=9))) return dt.strftime('%H:%M:%S.%f')