yes_cmdr/common/utils.py

368 lines
13 KiB
Python

import heapq
from .agent import Agent, Weapon, Action, TileNode, Module, Supply, Hangar, Transport
from .agent import MoveType, TerrainType, ActionType, AgentType, ModuleType
from typing import Tuple, List, Dict, Union
import numpy as np
import json
# Define the cost matrix for agent types and terrain types
cost_matrix = [
# Plain Road Sea Hill
[ 1, 1, 1, 1], # Air
[ 1.5, 1, 0, 2], # Ground
[ 0, 0, 1, 0], # Sea
[ 0, 0, 1, 0], # Subwater
]
def get_cost(move_type: MoveType, terrain_type: TerrainType):
return cost_matrix[move_type.value][terrain_type.value]
def get_axial_dis(pos1: Tuple[int, int], pos2: Tuple[int, int] = (0, 0)):
q1, r1 = pos1
q2, r2 = pos2
return (abs(q1 - q2) + abs(r1 - r2) + abs((q1 + r1) - (q2 + r2))) / 2
def astar_search(map_data: Dict[Tuple[int, int], object],
move_type: MoveType,
start: Tuple[int, int],
goal: Tuple[int, int] = None,
limit: float = float('inf'),
return_cost: bool = False) -> Union[List[Tuple[int, int]], Tuple[float, List[Tuple[int, int]]]]:
"""
A* search algorithm to find the shortest path from start to goal.
map_data: a dictionary of nodes, where the keys are the node coordinates and the values are the node objects
move_type: the type of agent to consider (0 for air, 1 for ground, 2 for sea, 3 for subwater)
start: the starting node
goal: the ending node
"""
if isinstance(start, tuple):
start = [start]
multi_source = False
elif isinstance(start, list):
multi_source = True
else:
raise ValueError(f"Invalid start type. Found{type(start)} but expected tuple or list.")
g = {}
f = {}
parent = {}
open_list = []
cand_parent_count = {}
for pos in start:
g[pos] = 0
f[pos] = 0
parent[pos] = None
cand_parent_count[pos] = 1
open_list.append((0.0, pos))
in_open_list = set(start)
heapq.heapify(open_list)
closed_list = set()
directions = [(1, 0), (1, -1), (0, -1), (-1, 0), (-1, 1), (0, 1)]
while open_list:
_, cur = heapq.heappop(open_list)
in_open_list.remove(cur)
if goal and cur == goal:
path = []
while cur not in start:
path.append(cur)
cur = parent[cur]
if multi_source:
if path:
path.pop(0) # 弹出实际上当前所在位置
path.append(cur) # 确定多源最短路的终点
else:
path.reverse()
return (cost, path) if return_cost else path
closed_list.add(cur)
for dq, dr in directions:
neighbor = (cur[0] + dq, cur[1] + dr)
if neighbor in map_data and neighbor not in closed_list:
cost = get_cost(move_type, map_data[neighbor].terrain_type) \
if not multi_source \
else get_cost(move_type, map_data[cur].terrain_type)
if cost == 0:
continue
tentative_g = g[cur] + cost
if neighbor not in g or tentative_g < g[neighbor]:
g[neighbor] = tentative_g
f[neighbor] = tentative_g + (get_axial_dis(neighbor, goal) if goal else 0)
if f[neighbor] > limit:
continue
parent[neighbor] = cur
cand_parent_count[neighbor] = 1
if neighbor not in in_open_list:
heapq.heappush(open_list, (f[neighbor], neighbor))
in_open_list.add(neighbor)
elif neighbor in g and tentative_g == g[neighbor] and f[neighbor] <= limit:
cand_parent_count[neighbor] += 1
if np.random.random() < 1 / cand_parent_count[neighbor]: # reservior sampling
parent[neighbor] = cur
if goal is not None:
raise ValueError("No path found")
return list(closed_list)
def get_path(agent, map_data, start, des: Union[Tuple[int, int], List[Tuple[int, int]]], limit=float('inf')) -> List[Tuple[int, int]]:
"""
Returns a list of move actions to move the agent from its current position to the destination.
"""
if isinstance(des, tuple):
raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=start, goal=des, limit=limit)
elif isinstance(des, list): # multi-source
raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=des, goal=start, limit=limit)
else:
raise ValueError(f"Invalid destination type. Found{type(des)} but expected tuple or list.")
if not raw_path:
raise ValueError("No path found")
# Create the frame path by breaking the path into smaller moves
path = []
sum = 0
raw_path = [start] + raw_path
# print(agent.pos, des, raw_path)
for parent, node in zip(raw_path[:-1], raw_path[1:]):
cost = get_cost(agent.move_type, map_data[node].terrain_type)
sum += cost
if sum > agent.fuel:
raise ValueError(f"Not enough fuel to reach {des}")
if sum > agent.mobility:
sum = cost
path.append(parent)
path.append(node)
return path
def axial_to_cube_dict(a):
q, r = a
return {
"x": q,
"y": -(q + r),
"z": r
}
def cube_dict_to_axial(c):
return (c["x"], c["z"])
def axial_to_cube(a):
q, r = a
return (q, -(q + r), r)
def cube_to_axial(c):
return (c[0], c[2])
def encode_axial(pos: Tuple[int, int]) -> int:
q, r = pos
if q == 0 and r == 0:
return 0
x, y, z = axial_to_cube(pos)
d = get_axial_dis(pos)
base_number = 3 * d * (d - 1) + 1
if x > 0 and y < 0 and z >= 0: # 0
return base_number + z
elif x <= 0 and y < 0 and z > 0: # 1
return base_number + d - x
elif x < 0 and y >= 0 and z > 0: # 2
return base_number + d * 2 + y
elif x < 0 and y > 0 and z <= 0: # 3
return base_number + d * 3 - z
elif x >= 0 and y > 0 and z < 0: # 4
return base_number + d * 4 + x
else: # x > 0 and y <= 0 and z < 0 # 5
return base_number + d * 5 - y
def decode_axial(num: int) -> Tuple[int, int]:
if num == 0:
return (0, 0)
l, r = 1, num
while l < r:
mid = (l + r) // 2
if range_to_count(mid) + 1 > num:
r = mid
else:
l = mid + 1
d = l
base_number = range_to_count(d - 1) + 1
# base_number = 1
# d = 1
# while base_number + 6 * d <= num:
# base_number += 6 * d
# d += 1
k = (num - base_number) // d
mod = (num - base_number) % d
if k == 0:
q = d - mod
r = mod
elif k == 1:
q = -mod
r = d
elif k == 2:
q = -d
r = d - mod
elif k == 3:
q = -(d - mod)
r = -mod
elif k == 4:
q = mod
r = -d
else: # k == 5
q = d
r = -(d - mod)
return (q, r)
def get_adj_pos(pos: Tuple[int, int], max_dis: int) -> List[Tuple[int, int]]:
"""
Returns a list of adjacent positons within the given range.
"""
d = [decode_axial(num) for num in range(1, range_to_count(int(max_dis)) + 1)]
return [(pos[0] + dq, pos[1] + dr) for dq, dr in d]
def range_to_count(max_dis: int) -> int: # not including 0
return 3 * max_dis * (max_dis + 1)
def dict_to_action(action_dict: dict) -> Action:
return Action(
action_type=ActionType.from_input(action_dict["action_type"]),
agent_id=action_dict["agent_id"],
target_id=action_dict.get("target_id", ""),
des=action_dict.get("des", (-1, -1)),
weapon_id=action_dict.get("weapon_id", -1),
weapon_name=action_dict.get("weapon_name", ""),
start_time=action_dict.get("start_time", 0),
end_type=action_dict.get("end_type", None),
attack_count=action_dict.get("attack_count", 0),
)
def dict_to_node(tile_dict: dict) -> TileNode:
return TileNode(
pos=(tile_dict["pos"]["q"], tile_dict["pos"]["r"]),
terrain_type=TerrainType.from_input(tile_dict["terrain_type"]),
# agent_id=tile_dict.get("agent_id", None),
# is_city=tile_dict.get("is_city", False),
# chaotic_value=tile_dict.get("chaotic_value", 0),
# occupy_value=tile_dict.get("occupy_value", 0),
# occupied_by=tile_dict.get("occupied_by", None)
)
def dict_to_weapon(weapon_dict: dict) -> Weapon:
return Weapon(
name=weapon_dict["name"],
attack_range=weapon_dict["attack_range"],
damage=weapon_dict["damage"],
max_ammo=weapon_dict["max_ammo"],
strike_types=[MoveType.from_input(strike_type) for strike_type in weapon_dict["strike_types"]]
)
def dict_to_module(module_dict: dict) -> Module:
module_type = ModuleType.from_input(module_dict["module_id"])
available_types = [AgentType.from_input(agent_type) for agent_type in module_dict.get("available_types", [])] \
if module_dict.get("available_types", []) is not None else []
capacity = module_dict.get("capacity", 0)
if module_type == ModuleType.HANGAR:
return Hangar(available_types=available_types, capacity=capacity)
elif module_type == ModuleType.SUPPLY:
return Supply(available_types=available_types, capacity=capacity)
elif module_type == ModuleType.TRANSPORT:
return Transport(available_types=available_types, capacity=capacity)
else:
raise ValueError(f"Invalid module type: {module_type}")
def dict_to_agent(agent_dict: dict) -> Agent:
return Agent(
agent_id=agent_dict["agent_id"],
agent_type=AgentType.from_input(agent_dict["type"]),
team_id=agent_dict["team_id"],
faction_id=agent_dict["faction_id"],
move_type=MoveType.from_input(agent_dict["move_type"]),
pos=(agent_dict["pos"]["q"], agent_dict["pos"]["r"]),
info_level=agent_dict.get("info_level", 0),
stealth_level=agent_dict.get("stealth_level", 0),
max_endurance=agent_dict["max_endurance"],
defense=agent_dict["defense"],
max_fuel=agent_dict["max_fuel"],
mobility=agent_dict["mobility"],
switchable_weapons=[dict_to_weapon(weapon_dict) for weapon_dict in agent_dict.get("switchable_weapons", [])],
modules=[dict_to_module(module_dict) for module_dict in agent_dict.get("modules", [])] if agent_dict.get("modules", []) is not None else []
)
def json_to_markdown(json_data):
# 解析JSON数据
if isinstance(json_data, str):
json_data = json.loads(json_data)
# 递归转换字典
def dict_to_markdown(d, indent=0):
markdown = ""
for key, value in d.items():
markdown += " " * indent + f"- {key}: "
if isinstance(value, dict):
markdown += "\n" + dict_to_markdown(value, indent + 1)
elif isinstance(value, list):
markdown += "\n" + list_to_markdown(value, indent + 1)
else:
markdown += f"{value}\n"
return markdown
# 递归转换列表
def list_to_markdown(lst, indent=0):
markdown = ""
for item in lst:
if isinstance(item, dict):
markdown += dict_to_markdown(item, indent)
elif isinstance(item, list):
markdown += list_to_markdown(item, indent)
else:
markdown += " " * indent + f"- {item}\n"
return markdown
# 判断是否是字典或列表
if isinstance(json_data, dict):
return dict_to_markdown(json_data)
elif isinstance(json_data, list):
return list_to_markdown(json_data)
else:
return str(json_data)
def increase_markdown_indent(markdown_str, levels=1, use_tabs=False):
# 根据需要使用空格或制表符来增加缩进
indent = "\t" * levels if use_tabs else " " * levels
# 将每一行加上缩进
lines = markdown_str.split("\n")
indented_lines = [indent + line for line in lines]
# 返回增加层级后的Markdown字符串
return "\n".join(indented_lines)
def agent_list_to_markdown(agent_list):
result = ""
for agent in agent_list:
result += f"### Agent {agent.agent_id}\n"
result += json_to_markdown(agent.to_dict()) + "\n"
return result
def battlefield_list_to_markdown(battlefield_list):
result = ""
for battlefield in battlefield_list:
result += f"### Battlefield {battlefield['sectorName']}\n"
result += json_to_markdown(battlefield) + "\n"
return result