368 lines
13 KiB
Python
368 lines
13 KiB
Python
import heapq
|
|
from .agent import Agent, Weapon, Action, TileNode, Module, Supply, Hangar, Transport
|
|
from .agent import MoveType, TerrainType, ActionType, AgentType, ModuleType
|
|
from typing import Tuple, List, Dict, Union
|
|
import numpy as np
|
|
import json
|
|
|
|
# Define the cost matrix for agent types and terrain types
|
|
cost_matrix = [
|
|
# Plain Road Sea Hill
|
|
[ 1, 1, 1, 1], # Air
|
|
[ 1.5, 1, 0, 2], # Ground
|
|
[ 0, 0, 1, 0], # Sea
|
|
[ 0, 0, 1, 0], # Subwater
|
|
]
|
|
|
|
def get_cost(move_type: MoveType, terrain_type: TerrainType):
|
|
return cost_matrix[move_type.value][terrain_type.value]
|
|
|
|
def get_axial_dis(pos1: Tuple[int, int], pos2: Tuple[int, int] = (0, 0)):
|
|
q1, r1 = pos1
|
|
q2, r2 = pos2
|
|
return (abs(q1 - q2) + abs(r1 - r2) + abs((q1 + r1) - (q2 + r2))) / 2
|
|
|
|
def astar_search(map_data: Dict[Tuple[int, int], object],
|
|
move_type: MoveType,
|
|
start: Tuple[int, int],
|
|
goal: Tuple[int, int] = None,
|
|
limit: float = float('inf'),
|
|
return_cost: bool = False) -> Union[List[Tuple[int, int]], Tuple[float, List[Tuple[int, int]]]]:
|
|
"""
|
|
A* search algorithm to find the shortest path from start to goal.
|
|
map_data: a dictionary of nodes, where the keys are the node coordinates and the values are the node objects
|
|
move_type: the type of agent to consider (0 for air, 1 for ground, 2 for sea, 3 for subwater)
|
|
start: the starting node
|
|
goal: the ending node
|
|
"""
|
|
if isinstance(start, tuple):
|
|
start = [start]
|
|
multi_source = False
|
|
elif isinstance(start, list):
|
|
multi_source = True
|
|
else:
|
|
raise ValueError(f"Invalid start type. Found{type(start)} but expected tuple or list.")
|
|
|
|
g = {}
|
|
f = {}
|
|
parent = {}
|
|
open_list = []
|
|
cand_parent_count = {}
|
|
|
|
for pos in start:
|
|
g[pos] = 0
|
|
f[pos] = 0
|
|
parent[pos] = None
|
|
cand_parent_count[pos] = 1
|
|
open_list.append((0.0, pos))
|
|
|
|
in_open_list = set(start)
|
|
heapq.heapify(open_list)
|
|
closed_list = set()
|
|
|
|
directions = [(1, 0), (1, -1), (0, -1), (-1, 0), (-1, 1), (0, 1)]
|
|
|
|
while open_list:
|
|
_, cur = heapq.heappop(open_list)
|
|
in_open_list.remove(cur)
|
|
|
|
if goal and cur == goal:
|
|
path = []
|
|
while cur not in start:
|
|
path.append(cur)
|
|
cur = parent[cur]
|
|
if multi_source:
|
|
if path:
|
|
path.pop(0) # 弹出实际上当前所在位置
|
|
path.append(cur) # 确定多源最短路的终点
|
|
else:
|
|
path.reverse()
|
|
return (cost, path) if return_cost else path
|
|
|
|
closed_list.add(cur)
|
|
|
|
for dq, dr in directions:
|
|
neighbor = (cur[0] + dq, cur[1] + dr)
|
|
if neighbor in map_data and neighbor not in closed_list:
|
|
cost = get_cost(move_type, map_data[neighbor].terrain_type) \
|
|
if not multi_source \
|
|
else get_cost(move_type, map_data[cur].terrain_type)
|
|
if cost == 0:
|
|
continue
|
|
tentative_g = g[cur] + cost
|
|
if neighbor not in g or tentative_g < g[neighbor]:
|
|
g[neighbor] = tentative_g
|
|
f[neighbor] = tentative_g + (get_axial_dis(neighbor, goal) if goal else 0)
|
|
if f[neighbor] > limit:
|
|
continue
|
|
parent[neighbor] = cur
|
|
cand_parent_count[neighbor] = 1
|
|
if neighbor not in in_open_list:
|
|
heapq.heappush(open_list, (f[neighbor], neighbor))
|
|
in_open_list.add(neighbor)
|
|
elif neighbor in g and tentative_g == g[neighbor] and f[neighbor] <= limit:
|
|
cand_parent_count[neighbor] += 1
|
|
if np.random.random() < 1 / cand_parent_count[neighbor]: # reservior sampling
|
|
parent[neighbor] = cur
|
|
|
|
if goal is not None:
|
|
raise ValueError("No path found")
|
|
|
|
return list(closed_list)
|
|
|
|
def get_path(agent, map_data, start, des: Union[Tuple[int, int], List[Tuple[int, int]]], limit=float('inf')) -> List[Tuple[int, int]]:
|
|
"""
|
|
Returns a list of move actions to move the agent from its current position to the destination.
|
|
"""
|
|
if isinstance(des, tuple):
|
|
raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=start, goal=des, limit=limit)
|
|
elif isinstance(des, list): # multi-source
|
|
raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=des, goal=start, limit=limit)
|
|
else:
|
|
raise ValueError(f"Invalid destination type. Found{type(des)} but expected tuple or list.")
|
|
|
|
if not raw_path:
|
|
raise ValueError("No path found")
|
|
|
|
# Create the frame path by breaking the path into smaller moves
|
|
path = []
|
|
sum = 0
|
|
raw_path = [start] + raw_path
|
|
|
|
# print(agent.pos, des, raw_path)
|
|
|
|
for parent, node in zip(raw_path[:-1], raw_path[1:]):
|
|
cost = get_cost(agent.move_type, map_data[node].terrain_type)
|
|
sum += cost
|
|
if sum > agent.fuel:
|
|
raise ValueError(f"Not enough fuel to reach {des}")
|
|
if sum > agent.mobility:
|
|
sum = cost
|
|
path.append(parent)
|
|
path.append(node)
|
|
return path
|
|
|
|
def axial_to_cube_dict(a):
|
|
q, r = a
|
|
return {
|
|
"x": q,
|
|
"y": -(q + r),
|
|
"z": r
|
|
}
|
|
|
|
def cube_dict_to_axial(c):
|
|
return (c["x"], c["z"])
|
|
|
|
def axial_to_cube(a):
|
|
q, r = a
|
|
return (q, -(q + r), r)
|
|
|
|
def cube_to_axial(c):
|
|
return (c[0], c[2])
|
|
|
|
def encode_axial(pos: Tuple[int, int]) -> int:
|
|
q, r = pos
|
|
if q == 0 and r == 0:
|
|
return 0
|
|
|
|
x, y, z = axial_to_cube(pos)
|
|
d = get_axial_dis(pos)
|
|
|
|
base_number = 3 * d * (d - 1) + 1
|
|
|
|
if x > 0 and y < 0 and z >= 0: # 0
|
|
return base_number + z
|
|
elif x <= 0 and y < 0 and z > 0: # 1
|
|
return base_number + d - x
|
|
elif x < 0 and y >= 0 and z > 0: # 2
|
|
return base_number + d * 2 + y
|
|
elif x < 0 and y > 0 and z <= 0: # 3
|
|
return base_number + d * 3 - z
|
|
elif x >= 0 and y > 0 and z < 0: # 4
|
|
return base_number + d * 4 + x
|
|
else: # x > 0 and y <= 0 and z < 0 # 5
|
|
return base_number + d * 5 - y
|
|
|
|
def decode_axial(num: int) -> Tuple[int, int]:
|
|
if num == 0:
|
|
return (0, 0)
|
|
|
|
l, r = 1, num
|
|
while l < r:
|
|
mid = (l + r) // 2
|
|
if range_to_count(mid) + 1 > num:
|
|
r = mid
|
|
else:
|
|
l = mid + 1
|
|
d = l
|
|
base_number = range_to_count(d - 1) + 1
|
|
# base_number = 1
|
|
# d = 1
|
|
# while base_number + 6 * d <= num:
|
|
# base_number += 6 * d
|
|
# d += 1
|
|
|
|
k = (num - base_number) // d
|
|
mod = (num - base_number) % d
|
|
|
|
if k == 0:
|
|
q = d - mod
|
|
r = mod
|
|
elif k == 1:
|
|
q = -mod
|
|
r = d
|
|
elif k == 2:
|
|
q = -d
|
|
r = d - mod
|
|
elif k == 3:
|
|
q = -(d - mod)
|
|
r = -mod
|
|
elif k == 4:
|
|
q = mod
|
|
r = -d
|
|
else: # k == 5
|
|
q = d
|
|
r = -(d - mod)
|
|
|
|
return (q, r)
|
|
|
|
def get_adj_pos(pos: Tuple[int, int], max_dis: int) -> List[Tuple[int, int]]:
|
|
"""
|
|
Returns a list of adjacent positons within the given range.
|
|
"""
|
|
d = [decode_axial(num) for num in range(1, range_to_count(int(max_dis)) + 1)]
|
|
return [(pos[0] + dq, pos[1] + dr) for dq, dr in d]
|
|
|
|
def range_to_count(max_dis: int) -> int: # not including 0
|
|
return 3 * max_dis * (max_dis + 1)
|
|
|
|
def dict_to_action(action_dict: dict) -> Action:
|
|
return Action(
|
|
action_type=ActionType.from_input(action_dict["action_type"]),
|
|
agent_id=action_dict["agent_id"],
|
|
target_id=action_dict.get("target_id", ""),
|
|
des=action_dict.get("des", (-1, -1)),
|
|
weapon_id=action_dict.get("weapon_id", -1),
|
|
weapon_name=action_dict.get("weapon_name", ""),
|
|
start_time=action_dict.get("start_time", 0),
|
|
end_type=action_dict.get("end_type", None),
|
|
attack_count=action_dict.get("attack_count", 0),
|
|
)
|
|
|
|
def dict_to_node(tile_dict: dict) -> TileNode:
|
|
return TileNode(
|
|
pos=(tile_dict["pos"]["q"], tile_dict["pos"]["r"]),
|
|
terrain_type=TerrainType.from_input(tile_dict["terrain_type"]),
|
|
# agent_id=tile_dict.get("agent_id", None),
|
|
# is_city=tile_dict.get("is_city", False),
|
|
# chaotic_value=tile_dict.get("chaotic_value", 0),
|
|
# occupy_value=tile_dict.get("occupy_value", 0),
|
|
# occupied_by=tile_dict.get("occupied_by", None)
|
|
)
|
|
|
|
def dict_to_weapon(weapon_dict: dict) -> Weapon:
|
|
return Weapon(
|
|
name=weapon_dict["name"],
|
|
attack_range=weapon_dict["attack_range"],
|
|
damage=weapon_dict["damage"],
|
|
max_ammo=weapon_dict["max_ammo"],
|
|
strike_types=[MoveType.from_input(strike_type) for strike_type in weapon_dict["strike_types"]]
|
|
)
|
|
|
|
def dict_to_module(module_dict: dict) -> Module:
|
|
module_type = ModuleType.from_input(module_dict["module_id"])
|
|
available_types = [AgentType.from_input(agent_type) for agent_type in module_dict.get("available_types", [])] \
|
|
if module_dict.get("available_types", []) is not None else []
|
|
capacity = module_dict.get("capacity", 0)
|
|
if module_type == ModuleType.HANGAR:
|
|
return Hangar(available_types=available_types, capacity=capacity)
|
|
elif module_type == ModuleType.SUPPLY:
|
|
return Supply(available_types=available_types, capacity=capacity)
|
|
elif module_type == ModuleType.TRANSPORT:
|
|
return Transport(available_types=available_types, capacity=capacity)
|
|
else:
|
|
raise ValueError(f"Invalid module type: {module_type}")
|
|
|
|
def dict_to_agent(agent_dict: dict) -> Agent:
|
|
return Agent(
|
|
agent_id=agent_dict["agent_id"],
|
|
agent_type=AgentType.from_input(agent_dict["type"]),
|
|
team_id=agent_dict["team_id"],
|
|
faction_id=agent_dict["faction_id"],
|
|
move_type=MoveType.from_input(agent_dict["move_type"]),
|
|
pos=(agent_dict["pos"]["q"], agent_dict["pos"]["r"]),
|
|
info_level=agent_dict.get("info_level", 0),
|
|
stealth_level=agent_dict.get("stealth_level", 0),
|
|
max_endurance=agent_dict["max_endurance"],
|
|
defense=agent_dict["defense"],
|
|
max_fuel=agent_dict["max_fuel"],
|
|
mobility=agent_dict["mobility"],
|
|
switchable_weapons=[dict_to_weapon(weapon_dict) for weapon_dict in agent_dict.get("switchable_weapons", [])],
|
|
modules=[dict_to_module(module_dict) for module_dict in agent_dict.get("modules", [])] if agent_dict.get("modules", []) is not None else []
|
|
)
|
|
|
|
|
|
def json_to_markdown(json_data):
|
|
# 解析JSON数据
|
|
if isinstance(json_data, str):
|
|
json_data = json.loads(json_data)
|
|
|
|
# 递归转换字典
|
|
def dict_to_markdown(d, indent=0):
|
|
markdown = ""
|
|
for key, value in d.items():
|
|
markdown += " " * indent + f"- {key}: "
|
|
if isinstance(value, dict):
|
|
markdown += "\n" + dict_to_markdown(value, indent + 1)
|
|
elif isinstance(value, list):
|
|
markdown += "\n" + list_to_markdown(value, indent + 1)
|
|
else:
|
|
markdown += f"{value}\n"
|
|
return markdown
|
|
|
|
# 递归转换列表
|
|
def list_to_markdown(lst, indent=0):
|
|
markdown = ""
|
|
for item in lst:
|
|
if isinstance(item, dict):
|
|
markdown += dict_to_markdown(item, indent)
|
|
elif isinstance(item, list):
|
|
markdown += list_to_markdown(item, indent)
|
|
else:
|
|
markdown += " " * indent + f"- {item}\n"
|
|
return markdown
|
|
|
|
# 判断是否是字典或列表
|
|
if isinstance(json_data, dict):
|
|
return dict_to_markdown(json_data)
|
|
elif isinstance(json_data, list):
|
|
return list_to_markdown(json_data)
|
|
else:
|
|
return str(json_data)
|
|
|
|
|
|
def increase_markdown_indent(markdown_str, levels=1, use_tabs=False):
|
|
# 根据需要使用空格或制表符来增加缩进
|
|
indent = "\t" * levels if use_tabs else " " * levels
|
|
|
|
# 将每一行加上缩进
|
|
lines = markdown_str.split("\n")
|
|
indented_lines = [indent + line for line in lines]
|
|
|
|
# 返回增加层级后的Markdown字符串
|
|
return "\n".join(indented_lines)
|
|
|
|
|
|
def agent_list_to_markdown(agent_list):
|
|
result = ""
|
|
for agent in agent_list:
|
|
result += f"### Agent {agent.agent_id}\n"
|
|
result += json_to_markdown(agent.to_dict()) + "\n"
|
|
return result
|
|
|
|
def battlefield_list_to_markdown(battlefield_list):
|
|
result = ""
|
|
for battlefield in battlefield_list:
|
|
result += f"### Battlefield {battlefield['sectorName']}\n"
|
|
result += json_to_markdown(battlefield) + "\n"
|
|
return result |