yes_cmdr/common/utils.py

368 lines
13 KiB
Python
Raw Permalink Normal View History

2024-12-18 21:30:53 +08:00
import heapq
2024-12-20 19:13:37 +08:00
from .agent import Agent, Weapon, Action, TileNode, Module, Supply, Hangar, Transport
2024-12-18 21:30:53 +08:00
from .agent import MoveType, TerrainType, ActionType, AgentType, ModuleType
from typing import Tuple, List, Dict, Union
import numpy as np
2024-12-20 19:13:37 +08:00
import json
2024-12-18 21:30:53 +08:00
# Define the cost matrix for agent types and terrain types
cost_matrix = [
# Plain Road Sea Hill
[ 1, 1, 1, 1], # Air
[ 1.5, 1, 0, 2], # Ground
[ 0, 0, 1, 0], # Sea
[ 0, 0, 1, 0], # Subwater
]
def get_cost(move_type: MoveType, terrain_type: TerrainType):
return cost_matrix[move_type.value][terrain_type.value]
def get_axial_dis(pos1: Tuple[int, int], pos2: Tuple[int, int] = (0, 0)):
q1, r1 = pos1
q2, r2 = pos2
return (abs(q1 - q2) + abs(r1 - r2) + abs((q1 + r1) - (q2 + r2))) / 2
def astar_search(map_data: Dict[Tuple[int, int], object],
move_type: MoveType,
start: Tuple[int, int],
goal: Tuple[int, int] = None,
limit: float = float('inf'),
return_cost: bool = False) -> Union[List[Tuple[int, int]], Tuple[float, List[Tuple[int, int]]]]:
"""
A* search algorithm to find the shortest path from start to goal.
map_data: a dictionary of nodes, where the keys are the node coordinates and the values are the node objects
move_type: the type of agent to consider (0 for air, 1 for ground, 2 for sea, 3 for subwater)
start: the starting node
goal: the ending node
"""
if isinstance(start, tuple):
start = [start]
multi_source = False
elif isinstance(start, list):
multi_source = True
else:
raise ValueError(f"Invalid start type. Found{type(start)} but expected tuple or list.")
g = {}
f = {}
parent = {}
open_list = []
cand_parent_count = {}
for pos in start:
g[pos] = 0
f[pos] = 0
parent[pos] = None
cand_parent_count[pos] = 1
open_list.append((0.0, pos))
in_open_list = set(start)
heapq.heapify(open_list)
closed_list = set()
directions = [(1, 0), (1, -1), (0, -1), (-1, 0), (-1, 1), (0, 1)]
while open_list:
_, cur = heapq.heappop(open_list)
in_open_list.remove(cur)
if goal and cur == goal:
path = []
while cur not in start:
path.append(cur)
cur = parent[cur]
if multi_source:
if path:
path.pop(0) # 弹出实际上当前所在位置
path.append(cur) # 确定多源最短路的终点
else:
path.reverse()
return (cost, path) if return_cost else path
closed_list.add(cur)
for dq, dr in directions:
neighbor = (cur[0] + dq, cur[1] + dr)
if neighbor in map_data and neighbor not in closed_list:
cost = get_cost(move_type, map_data[neighbor].terrain_type) \
if not multi_source \
else get_cost(move_type, map_data[cur].terrain_type)
if cost == 0:
continue
tentative_g = g[cur] + cost
if neighbor not in g or tentative_g < g[neighbor]:
g[neighbor] = tentative_g
f[neighbor] = tentative_g + (get_axial_dis(neighbor, goal) if goal else 0)
if f[neighbor] > limit:
continue
parent[neighbor] = cur
cand_parent_count[neighbor] = 1
if neighbor not in in_open_list:
heapq.heappush(open_list, (f[neighbor], neighbor))
in_open_list.add(neighbor)
elif neighbor in g and tentative_g == g[neighbor] and f[neighbor] <= limit:
cand_parent_count[neighbor] += 1
if np.random.random() < 1 / cand_parent_count[neighbor]: # reservior sampling
parent[neighbor] = cur
if goal is not None:
raise ValueError("No path found")
return list(closed_list)
def get_path(agent, map_data, start, des: Union[Tuple[int, int], List[Tuple[int, int]]], limit=float('inf')) -> List[Tuple[int, int]]:
"""
Returns a list of move actions to move the agent from its current position to the destination.
"""
if isinstance(des, tuple):
raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=start, goal=des, limit=limit)
elif isinstance(des, list): # multi-source
raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=des, goal=start, limit=limit)
else:
raise ValueError(f"Invalid destination type. Found{type(des)} but expected tuple or list.")
if not raw_path:
raise ValueError("No path found")
# Create the frame path by breaking the path into smaller moves
path = []
sum = 0
raw_path = [start] + raw_path
# print(agent.pos, des, raw_path)
for parent, node in zip(raw_path[:-1], raw_path[1:]):
cost = get_cost(agent.move_type, map_data[node].terrain_type)
sum += cost
if sum > agent.fuel:
raise ValueError(f"Not enough fuel to reach {des}")
if sum > agent.mobility:
sum = cost
path.append(parent)
path.append(node)
return path
def axial_to_cube_dict(a):
q, r = a
return {
"x": q,
"y": -(q + r),
"z": r
}
def cube_dict_to_axial(c):
return (c["x"], c["z"])
def axial_to_cube(a):
q, r = a
return (q, -(q + r), r)
def cube_to_axial(c):
return (c[0], c[2])
def encode_axial(pos: Tuple[int, int]) -> int:
q, r = pos
if q == 0 and r == 0:
return 0
x, y, z = axial_to_cube(pos)
d = get_axial_dis(pos)
base_number = 3 * d * (d - 1) + 1
if x > 0 and y < 0 and z >= 0: # 0
return base_number + z
elif x <= 0 and y < 0 and z > 0: # 1
return base_number + d - x
elif x < 0 and y >= 0 and z > 0: # 2
return base_number + d * 2 + y
elif x < 0 and y > 0 and z <= 0: # 3
return base_number + d * 3 - z
elif x >= 0 and y > 0 and z < 0: # 4
return base_number + d * 4 + x
else: # x > 0 and y <= 0 and z < 0 # 5
return base_number + d * 5 - y
def decode_axial(num: int) -> Tuple[int, int]:
if num == 0:
return (0, 0)
l, r = 1, num
while l < r:
mid = (l + r) // 2
if range_to_count(mid) + 1 > num:
r = mid
else:
l = mid + 1
d = l
base_number = range_to_count(d - 1) + 1
# base_number = 1
# d = 1
# while base_number + 6 * d <= num:
# base_number += 6 * d
# d += 1
k = (num - base_number) // d
mod = (num - base_number) % d
if k == 0:
q = d - mod
r = mod
elif k == 1:
q = -mod
r = d
elif k == 2:
q = -d
r = d - mod
elif k == 3:
q = -(d - mod)
r = -mod
elif k == 4:
q = mod
r = -d
else: # k == 5
q = d
r = -(d - mod)
return (q, r)
def get_adj_pos(pos: Tuple[int, int], max_dis: int) -> List[Tuple[int, int]]:
"""
Returns a list of adjacent positons within the given range.
"""
d = [decode_axial(num) for num in range(1, range_to_count(int(max_dis)) + 1)]
return [(pos[0] + dq, pos[1] + dr) for dq, dr in d]
def range_to_count(max_dis: int) -> int: # not including 0
return 3 * max_dis * (max_dis + 1)
def dict_to_action(action_dict: dict) -> Action:
return Action(
action_type=ActionType.from_input(action_dict["action_type"]),
agent_id=action_dict["agent_id"],
target_id=action_dict.get("target_id", ""),
des=action_dict.get("des", (-1, -1)),
weapon_id=action_dict.get("weapon_id", -1),
weapon_name=action_dict.get("weapon_name", ""),
start_time=action_dict.get("start_time", 0),
end_type=action_dict.get("end_type", None),
attack_count=action_dict.get("attack_count", 0),
)
def dict_to_node(tile_dict: dict) -> TileNode:
return TileNode(
pos=(tile_dict["pos"]["q"], tile_dict["pos"]["r"]),
terrain_type=TerrainType.from_input(tile_dict["terrain_type"]),
# agent_id=tile_dict.get("agent_id", None),
# is_city=tile_dict.get("is_city", False),
# chaotic_value=tile_dict.get("chaotic_value", 0),
# occupy_value=tile_dict.get("occupy_value", 0),
# occupied_by=tile_dict.get("occupied_by", None)
)
def dict_to_weapon(weapon_dict: dict) -> Weapon:
return Weapon(
name=weapon_dict["name"],
attack_range=weapon_dict["attack_range"],
damage=weapon_dict["damage"],
max_ammo=weapon_dict["max_ammo"],
strike_types=[MoveType.from_input(strike_type) for strike_type in weapon_dict["strike_types"]]
)
def dict_to_module(module_dict: dict) -> Module:
module_type = ModuleType.from_input(module_dict["module_id"])
available_types = [AgentType.from_input(agent_type) for agent_type in module_dict.get("available_types", [])] \
if module_dict.get("available_types", []) is not None else []
capacity = module_dict.get("capacity", 0)
2024-12-20 19:13:37 +08:00
if module_type == ModuleType.HANGAR:
return Hangar(available_types=available_types, capacity=capacity)
2024-12-18 21:30:53 +08:00
elif module_type == ModuleType.SUPPLY:
return Supply(available_types=available_types, capacity=capacity)
elif module_type == ModuleType.TRANSPORT:
return Transport(available_types=available_types, capacity=capacity)
else:
raise ValueError(f"Invalid module type: {module_type}")
def dict_to_agent(agent_dict: dict) -> Agent:
return Agent(
agent_id=agent_dict["agent_id"],
agent_type=AgentType.from_input(agent_dict["type"]),
team_id=agent_dict["team_id"],
faction_id=agent_dict["faction_id"],
move_type=MoveType.from_input(agent_dict["move_type"]),
pos=(agent_dict["pos"]["q"], agent_dict["pos"]["r"]),
info_level=agent_dict.get("info_level", 0),
stealth_level=agent_dict.get("stealth_level", 0),
max_endurance=agent_dict["max_endurance"],
defense=agent_dict["defense"],
max_fuel=agent_dict["max_fuel"],
mobility=agent_dict["mobility"],
switchable_weapons=[dict_to_weapon(weapon_dict) for weapon_dict in agent_dict.get("switchable_weapons", [])],
modules=[dict_to_module(module_dict) for module_dict in agent_dict.get("modules", [])] if agent_dict.get("modules", []) is not None else []
2024-12-20 19:13:37 +08:00
)
def json_to_markdown(json_data):
# 解析JSON数据
if isinstance(json_data, str):
json_data = json.loads(json_data)
# 递归转换字典
def dict_to_markdown(d, indent=0):
markdown = ""
for key, value in d.items():
markdown += " " * indent + f"- {key}: "
if isinstance(value, dict):
markdown += "\n" + dict_to_markdown(value, indent + 1)
elif isinstance(value, list):
markdown += "\n" + list_to_markdown(value, indent + 1)
else:
markdown += f"{value}\n"
return markdown
# 递归转换列表
def list_to_markdown(lst, indent=0):
markdown = ""
for item in lst:
if isinstance(item, dict):
markdown += dict_to_markdown(item, indent)
elif isinstance(item, list):
markdown += list_to_markdown(item, indent)
else:
markdown += " " * indent + f"- {item}\n"
return markdown
# 判断是否是字典或列表
if isinstance(json_data, dict):
return dict_to_markdown(json_data)
elif isinstance(json_data, list):
return list_to_markdown(json_data)
else:
return str(json_data)
def increase_markdown_indent(markdown_str, levels=1, use_tabs=False):
# 根据需要使用空格或制表符来增加缩进
indent = "\t" * levels if use_tabs else " " * levels
# 将每一行加上缩进
lines = markdown_str.split("\n")
indented_lines = [indent + line for line in lines]
# 返回增加层级后的Markdown字符串
return "\n".join(indented_lines)
def agent_list_to_markdown(agent_list):
result = ""
for agent in agent_list:
result += f"### Agent {agent.agent_id}\n"
result += json_to_markdown(agent.to_dict()) + "\n"
return result
def battlefield_list_to_markdown(battlefield_list):
result = ""
for battlefield in battlefield_list:
result += f"### Battlefield {battlefield['sectorName']}\n"
result += json_to_markdown(battlefield) + "\n"
return result