import heapq from .agent import Agent, Weapon, Action, TileNode, Module, Supply, Hangar, Transport from .agent import MoveType, TerrainType, ActionType, AgentType, ModuleType from typing import Tuple, List, Dict, Union import numpy as np import json # Define the cost matrix for agent types and terrain types cost_matrix = [ # Plain Road Sea Hill [ 1, 1, 1, 1], # Air [ 1.5, 1, 0, 2], # Ground [ 0, 0, 1, 0], # Sea [ 0, 0, 1, 0], # Subwater ] def get_cost(move_type: MoveType, terrain_type: TerrainType): return cost_matrix[move_type.value][terrain_type.value] def get_axial_dis(pos1: Tuple[int, int], pos2: Tuple[int, int] = (0, 0)): q1, r1 = pos1 q2, r2 = pos2 return (abs(q1 - q2) + abs(r1 - r2) + abs((q1 + r1) - (q2 + r2))) / 2 def astar_search(map_data: Dict[Tuple[int, int], object], move_type: MoveType, start: Tuple[int, int], goal: Tuple[int, int] = None, limit: float = float('inf'), return_cost: bool = False) -> Union[List[Tuple[int, int]], Tuple[float, List[Tuple[int, int]]]]: """ A* search algorithm to find the shortest path from start to goal. map_data: a dictionary of nodes, where the keys are the node coordinates and the values are the node objects move_type: the type of agent to consider (0 for air, 1 for ground, 2 for sea, 3 for subwater) start: the starting node goal: the ending node """ if isinstance(start, tuple): start = [start] multi_source = False elif isinstance(start, list): multi_source = True else: raise ValueError(f"Invalid start type. Found{type(start)} but expected tuple or list.") g = {} f = {} parent = {} open_list = [] cand_parent_count = {} for pos in start: g[pos] = 0 f[pos] = 0 parent[pos] = None cand_parent_count[pos] = 1 open_list.append((0.0, pos)) in_open_list = set(start) heapq.heapify(open_list) closed_list = set() directions = [(1, 0), (1, -1), (0, -1), (-1, 0), (-1, 1), (0, 1)] while open_list: _, cur = heapq.heappop(open_list) in_open_list.remove(cur) if goal and cur == goal: path = [] while cur not in start: path.append(cur) cur = parent[cur] if multi_source: if path: path.pop(0) # 弹出实际上当前所在位置 path.append(cur) # 确定多源最短路的终点 else: path.reverse() return (cost, path) if return_cost else path closed_list.add(cur) for dq, dr in directions: neighbor = (cur[0] + dq, cur[1] + dr) if neighbor in map_data and neighbor not in closed_list: cost = get_cost(move_type, map_data[neighbor].terrain_type) \ if not multi_source \ else get_cost(move_type, map_data[cur].terrain_type) if cost == 0: continue tentative_g = g[cur] + cost if neighbor not in g or tentative_g < g[neighbor]: g[neighbor] = tentative_g f[neighbor] = tentative_g + (get_axial_dis(neighbor, goal) if goal else 0) if f[neighbor] > limit: continue parent[neighbor] = cur cand_parent_count[neighbor] = 1 if neighbor not in in_open_list: heapq.heappush(open_list, (f[neighbor], neighbor)) in_open_list.add(neighbor) elif neighbor in g and tentative_g == g[neighbor] and f[neighbor] <= limit: cand_parent_count[neighbor] += 1 if np.random.random() < 1 / cand_parent_count[neighbor]: # reservior sampling parent[neighbor] = cur if goal is not None: raise ValueError("No path found") return list(closed_list) def get_path(agent, map_data, start, des: Union[Tuple[int, int], List[Tuple[int, int]]], limit=float('inf')) -> List[Tuple[int, int]]: """ Returns a list of move actions to move the agent from its current position to the destination. """ if isinstance(des, tuple): raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=start, goal=des, limit=limit) elif isinstance(des, list): # multi-source raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=des, goal=start, limit=limit) else: raise ValueError(f"Invalid destination type. Found{type(des)} but expected tuple or list.") if not raw_path: raise ValueError("No path found") # Create the frame path by breaking the path into smaller moves path = [] sum = 0 raw_path = [start] + raw_path # print(agent.pos, des, raw_path) for parent, node in zip(raw_path[:-1], raw_path[1:]): cost = get_cost(agent.move_type, map_data[node].terrain_type) sum += cost if sum > agent.fuel: raise ValueError(f"Not enough fuel to reach {des}") if sum > agent.mobility: sum = cost path.append(parent) path.append(node) return path def axial_to_cube_dict(a): q, r = a return { "x": q, "y": -(q + r), "z": r } def cube_dict_to_axial(c): return (c["x"], c["z"]) def axial_to_cube(a): q, r = a return (q, -(q + r), r) def cube_to_axial(c): return (c[0], c[2]) def encode_axial(pos: Tuple[int, int]) -> int: q, r = pos if q == 0 and r == 0: return 0 x, y, z = axial_to_cube(pos) d = get_axial_dis(pos) base_number = 3 * d * (d - 1) + 1 if x > 0 and y < 0 and z >= 0: # 0 return base_number + z elif x <= 0 and y < 0 and z > 0: # 1 return base_number + d - x elif x < 0 and y >= 0 and z > 0: # 2 return base_number + d * 2 + y elif x < 0 and y > 0 and z <= 0: # 3 return base_number + d * 3 - z elif x >= 0 and y > 0 and z < 0: # 4 return base_number + d * 4 + x else: # x > 0 and y <= 0 and z < 0 # 5 return base_number + d * 5 - y def decode_axial(num: int) -> Tuple[int, int]: if num == 0: return (0, 0) l, r = 1, num while l < r: mid = (l + r) // 2 if range_to_count(mid) + 1 > num: r = mid else: l = mid + 1 d = l base_number = range_to_count(d - 1) + 1 # base_number = 1 # d = 1 # while base_number + 6 * d <= num: # base_number += 6 * d # d += 1 k = (num - base_number) // d mod = (num - base_number) % d if k == 0: q = d - mod r = mod elif k == 1: q = -mod r = d elif k == 2: q = -d r = d - mod elif k == 3: q = -(d - mod) r = -mod elif k == 4: q = mod r = -d else: # k == 5 q = d r = -(d - mod) return (q, r) def get_adj_pos(pos: Tuple[int, int], max_dis: int) -> List[Tuple[int, int]]: """ Returns a list of adjacent positons within the given range. """ d = [decode_axial(num) for num in range(1, range_to_count(int(max_dis)) + 1)] return [(pos[0] + dq, pos[1] + dr) for dq, dr in d] def range_to_count(max_dis: int) -> int: # not including 0 return 3 * max_dis * (max_dis + 1) def dict_to_action(action_dict: dict) -> Action: return Action( action_type=ActionType.from_input(action_dict["action_type"]), agent_id=action_dict["agent_id"], target_id=action_dict.get("target_id", ""), des=action_dict.get("des", (-1, -1)), weapon_id=action_dict.get("weapon_id", -1), weapon_name=action_dict.get("weapon_name", ""), start_time=action_dict.get("start_time", 0), end_type=action_dict.get("end_type", None), attack_count=action_dict.get("attack_count", 0), ) def dict_to_node(tile_dict: dict) -> TileNode: return TileNode( pos=(tile_dict["pos"]["q"], tile_dict["pos"]["r"]), terrain_type=TerrainType.from_input(tile_dict["terrain_type"]), # agent_id=tile_dict.get("agent_id", None), # is_city=tile_dict.get("is_city", False), # chaotic_value=tile_dict.get("chaotic_value", 0), # occupy_value=tile_dict.get("occupy_value", 0), # occupied_by=tile_dict.get("occupied_by", None) ) def dict_to_weapon(weapon_dict: dict) -> Weapon: return Weapon( name=weapon_dict["name"], attack_range=weapon_dict["attack_range"], damage=weapon_dict["damage"], max_ammo=weapon_dict["max_ammo"], strike_types=[MoveType.from_input(strike_type) for strike_type in weapon_dict["strike_types"]] ) def dict_to_module(module_dict: dict) -> Module: module_type = ModuleType.from_input(module_dict["module_id"]) available_types = [AgentType.from_input(agent_type) for agent_type in module_dict.get("available_types", [])] \ if module_dict.get("available_types", []) is not None else [] capacity = module_dict.get("capacity", 0) if module_type == ModuleType.HANGAR: return Hangar(available_types=available_types, capacity=capacity) elif module_type == ModuleType.SUPPLY: return Supply(available_types=available_types, capacity=capacity) elif module_type == ModuleType.TRANSPORT: return Transport(available_types=available_types, capacity=capacity) else: raise ValueError(f"Invalid module type: {module_type}") def dict_to_agent(agent_dict: dict) -> Agent: return Agent( agent_id=agent_dict["agent_id"], agent_type=AgentType.from_input(agent_dict["type"]), team_id=agent_dict["team_id"], faction_id=agent_dict["faction_id"], move_type=MoveType.from_input(agent_dict["move_type"]), pos=(agent_dict["pos"]["q"], agent_dict["pos"]["r"]), info_level=agent_dict.get("info_level", 0), stealth_level=agent_dict.get("stealth_level", 0), max_endurance=agent_dict["max_endurance"], defense=agent_dict["defense"], max_fuel=agent_dict["max_fuel"], mobility=agent_dict["mobility"], switchable_weapons=[dict_to_weapon(weapon_dict) for weapon_dict in agent_dict.get("switchable_weapons", [])], modules=[dict_to_module(module_dict) for module_dict in agent_dict.get("modules", [])] if agent_dict.get("modules", []) is not None else [] ) def json_to_markdown(json_data): # 解析JSON数据 if isinstance(json_data, str): json_data = json.loads(json_data) # 递归转换字典 def dict_to_markdown(d, indent=0): markdown = "" for key, value in d.items(): markdown += " " * indent + f"- {key}: " if isinstance(value, dict): markdown += "\n" + dict_to_markdown(value, indent + 1) elif isinstance(value, list): markdown += "\n" + list_to_markdown(value, indent + 1) else: markdown += f"{value}\n" return markdown # 递归转换列表 def list_to_markdown(lst, indent=0): markdown = "" for item in lst: if isinstance(item, dict): markdown += dict_to_markdown(item, indent) elif isinstance(item, list): markdown += list_to_markdown(item, indent) else: markdown += " " * indent + f"- {item}\n" return markdown # 判断是否是字典或列表 if isinstance(json_data, dict): return dict_to_markdown(json_data) elif isinstance(json_data, list): return list_to_markdown(json_data) else: return str(json_data) def increase_markdown_indent(markdown_str, levels=1, use_tabs=False): # 根据需要使用空格或制表符来增加缩进 indent = "\t" * levels if use_tabs else " " * levels # 将每一行加上缩进 lines = markdown_str.split("\n") indented_lines = [indent + line for line in lines] # 返回增加层级后的Markdown字符串 return "\n".join(indented_lines) def agent_list_to_markdown(agent_list): result = "" for agent in agent_list: result += f"### Agent {agent.agent_id}\n" result += json_to_markdown(agent.to_dict()) + "\n" return result def battlefield_list_to_markdown(battlefield_list): result = "" for battlefield in battlefield_list: result += f"### Battlefield {battlefield['sectorName']}\n" result += json_to_markdown(battlefield) + "\n" return result