import heapq from .agent import Agent, Weapon, Action, TileNode, Module, Supply, Hanger, Transport from .agent import MoveType, TerrainType, ActionType, AgentType, ModuleType from typing import Tuple, List, Dict, Union import numpy as np # Define the cost matrix for agent types and terrain types cost_matrix = [ # Plain Road Sea Hill [ 1, 1, 1, 1], # Air [ 1.5, 1, 0, 2], # Ground [ 0, 0, 1, 0], # Sea [ 0, 0, 1, 0], # Subwater ] def get_cost(move_type: MoveType, terrain_type: TerrainType): return cost_matrix[move_type.value][terrain_type.value] def get_axial_dis(pos1: Tuple[int, int], pos2: Tuple[int, int] = (0, 0)): q1, r1 = pos1 q2, r2 = pos2 return (abs(q1 - q2) + abs(r1 - r2) + abs((q1 + r1) - (q2 + r2))) / 2 def astar_search(map_data: Dict[Tuple[int, int], object], move_type: MoveType, start: Tuple[int, int], goal: Tuple[int, int] = None, limit: float = float('inf'), return_cost: bool = False) -> Union[List[Tuple[int, int]], Tuple[float, List[Tuple[int, int]]]]: """ A* search algorithm to find the shortest path from start to goal. map_data: a dictionary of nodes, where the keys are the node coordinates and the values are the node objects move_type: the type of agent to consider (0 for air, 1 for ground, 2 for sea, 3 for subwater) start: the starting node goal: the ending node """ if isinstance(start, tuple): start = [start] multi_source = False elif isinstance(start, list): multi_source = True else: raise ValueError(f"Invalid start type. Found{type(start)} but expected tuple or list.") g = {} f = {} parent = {} open_list = [] cand_parent_count = {} for pos in start: g[pos] = 0 f[pos] = 0 parent[pos] = None cand_parent_count[pos] = 1 open_list.append((0.0, pos)) in_open_list = set(start) heapq.heapify(open_list) closed_list = set() directions = [(1, 0), (1, -1), (0, -1), (-1, 0), (-1, 1), (0, 1)] while open_list: _, cur = heapq.heappop(open_list) in_open_list.remove(cur) if goal and cur == goal: path = [] while cur not in start: path.append(cur) cur = parent[cur] if multi_source: if path: path.pop(0) # 弹出实际上当前所在位置 path.append(cur) # 确定多源最短路的终点 else: path.reverse() return (cost, path) if return_cost else path closed_list.add(cur) for dq, dr in directions: neighbor = (cur[0] + dq, cur[1] + dr) if neighbor in map_data and neighbor not in closed_list: cost = get_cost(move_type, map_data[neighbor].terrain_type) \ if not multi_source \ else get_cost(move_type, map_data[cur].terrain_type) if cost == 0: continue tentative_g = g[cur] + cost if neighbor not in g or tentative_g < g[neighbor]: g[neighbor] = tentative_g f[neighbor] = tentative_g + (get_axial_dis(neighbor, goal) if goal else 0) if f[neighbor] > limit: continue parent[neighbor] = cur cand_parent_count[neighbor] = 1 if neighbor not in in_open_list: heapq.heappush(open_list, (f[neighbor], neighbor)) in_open_list.add(neighbor) elif neighbor in g and tentative_g == g[neighbor] and f[neighbor] <= limit: cand_parent_count[neighbor] += 1 if np.random.random() < 1 / cand_parent_count[neighbor]: # reservior sampling parent[neighbor] = cur if goal is not None: raise ValueError("No path found") return list(closed_list) def get_path(agent, map_data, start, des: Union[Tuple[int, int], List[Tuple[int, int]]], limit=float('inf')) -> List[Tuple[int, int]]: """ Returns a list of move actions to move the agent from its current position to the destination. """ if isinstance(des, tuple): raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=start, goal=des, limit=limit) elif isinstance(des, list): # multi-source raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=des, goal=start, limit=limit) else: raise ValueError(f"Invalid destination type. Found{type(des)} but expected tuple or list.") if not raw_path: raise ValueError("No path found") # Create the frame path by breaking the path into smaller moves path = [] sum = 0 raw_path = [start] + raw_path # print(agent.pos, des, raw_path) for parent, node in zip(raw_path[:-1], raw_path[1:]): cost = get_cost(agent.move_type, map_data[node].terrain_type) sum += cost if sum > agent.fuel: raise ValueError(f"Not enough fuel to reach {des}") if sum > agent.mobility: sum = cost path.append(parent) path.append(node) return path def axial_to_cube_dict(a): q, r = a return { "x": q, "y": -(q + r), "z": r } def cube_dict_to_axial(c): return (c["x"], c["z"]) def axial_to_cube(a): q, r = a return (q, -(q + r), r) def cube_to_axial(c): return (c[0], c[2]) def encode_axial(pos: Tuple[int, int]) -> int: q, r = pos if q == 0 and r == 0: return 0 x, y, z = axial_to_cube(pos) d = get_axial_dis(pos) base_number = 3 * d * (d - 1) + 1 if x > 0 and y < 0 and z >= 0: # 0 return base_number + z elif x <= 0 and y < 0 and z > 0: # 1 return base_number + d - x elif x < 0 and y >= 0 and z > 0: # 2 return base_number + d * 2 + y elif x < 0 and y > 0 and z <= 0: # 3 return base_number + d * 3 - z elif x >= 0 and y > 0 and z < 0: # 4 return base_number + d * 4 + x else: # x > 0 and y <= 0 and z < 0 # 5 return base_number + d * 5 - y def decode_axial(num: int) -> Tuple[int, int]: if num == 0: return (0, 0) l, r = 1, num while l < r: mid = (l + r) // 2 if range_to_count(mid) + 1 > num: r = mid else: l = mid + 1 d = l base_number = range_to_count(d - 1) + 1 # base_number = 1 # d = 1 # while base_number + 6 * d <= num: # base_number += 6 * d # d += 1 k = (num - base_number) // d mod = (num - base_number) % d if k == 0: q = d - mod r = mod elif k == 1: q = -mod r = d elif k == 2: q = -d r = d - mod elif k == 3: q = -(d - mod) r = -mod elif k == 4: q = mod r = -d else: # k == 5 q = d r = -(d - mod) return (q, r) def get_adj_pos(pos: Tuple[int, int], max_dis: int) -> List[Tuple[int, int]]: """ Returns a list of adjacent positons within the given range. """ d = [decode_axial(num) for num in range(1, range_to_count(int(max_dis)) + 1)] return [(pos[0] + dq, pos[1] + dr) for dq, dr in d] def range_to_count(max_dis: int) -> int: # not including 0 return 3 * max_dis * (max_dis + 1) def dict_to_action(action_dict: dict) -> Action: return Action( action_type=ActionType.from_input(action_dict["action_type"]), agent_id=action_dict["agent_id"], target_id=action_dict.get("target_id", ""), des=action_dict.get("des", (-1, -1)), weapon_id=action_dict.get("weapon_id", -1), weapon_name=action_dict.get("weapon_name", ""), start_time=action_dict.get("start_time", 0), end_type=action_dict.get("end_type", None), attack_count=action_dict.get("attack_count", 0), ) def dict_to_node(tile_dict: dict) -> TileNode: return TileNode( pos=(tile_dict["pos"]["q"], tile_dict["pos"]["r"]), terrain_type=TerrainType.from_input(tile_dict["terrain_type"]), # agent_id=tile_dict.get("agent_id", None), # is_city=tile_dict.get("is_city", False), # chaotic_value=tile_dict.get("chaotic_value", 0), # occupy_value=tile_dict.get("occupy_value", 0), # occupied_by=tile_dict.get("occupied_by", None) ) def dict_to_weapon(weapon_dict: dict) -> Weapon: return Weapon( name=weapon_dict["name"], attack_range=weapon_dict["attack_range"], damage=weapon_dict["damage"], max_ammo=weapon_dict["max_ammo"], strike_types=[MoveType.from_input(strike_type) for strike_type in weapon_dict["strike_types"]] ) def dict_to_module(module_dict: dict) -> Module: module_type = ModuleType.from_input(module_dict["module_id"]) available_types = [AgentType.from_input(agent_type) for agent_type in module_dict.get("available_types", [])] \ if module_dict.get("available_types", []) is not None else [] capacity = module_dict.get("capacity", 0) if module_type == ModuleType.HANGER: return Hanger(available_types=available_types, capacity=capacity) elif module_type == ModuleType.SUPPLY: return Supply(available_types=available_types, capacity=capacity) elif module_type == ModuleType.TRANSPORT: return Transport(available_types=available_types, capacity=capacity) else: raise ValueError(f"Invalid module type: {module_type}") def dict_to_agent(agent_dict: dict) -> Agent: return Agent( agent_id=agent_dict["agent_id"], agent_type=AgentType.from_input(agent_dict["type"]), team_id=agent_dict["team_id"], faction_id=agent_dict["faction_id"], move_type=MoveType.from_input(agent_dict["move_type"]), pos=(agent_dict["pos"]["q"], agent_dict["pos"]["r"]), info_level=agent_dict.get("info_level", 0), stealth_level=agent_dict.get("stealth_level", 0), max_endurance=agent_dict["max_endurance"], defense=agent_dict["defense"], max_fuel=agent_dict["max_fuel"], mobility=agent_dict["mobility"], switchable_weapons=[dict_to_weapon(weapon_dict) for weapon_dict in agent_dict.get("switchable_weapons", [])], modules=[dict_to_module(module_dict) for module_dict in agent_dict.get("modules", [])] if agent_dict.get("modules", []) is not None else [] )