commit 33ced41bc03bc20288d8fe643d7bb4f4d06328a7 Author: zuchaoli Date: Wed Dec 18 21:30:53 2024 +0800 sumbit diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/__pycache__/__init__.cpython-310.pyc b/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..90ac4db Binary files /dev/null and b/__pycache__/__init__.cpython-310.pyc differ diff --git a/__pycache__/yes_cmdr_env.cpython-310.pyc b/__pycache__/yes_cmdr_env.cpython-310.pyc new file mode 100644 index 0000000..2fe8ef9 Binary files /dev/null and b/__pycache__/yes_cmdr_env.cpython-310.pyc differ diff --git a/__pycache__/yes_cmdr_utils.cpython-310.pyc b/__pycache__/yes_cmdr_utils.cpython-310.pyc new file mode 100644 index 0000000..a713446 Binary files /dev/null and b/__pycache__/yes_cmdr_utils.cpython-310.pyc differ diff --git a/common/__pycache__/agent.cpython-310.pyc b/common/__pycache__/agent.cpython-310.pyc new file mode 100644 index 0000000..d9946c0 Binary files /dev/null and b/common/__pycache__/agent.cpython-310.pyc differ diff --git a/common/__pycache__/agent.cpython-39.pyc b/common/__pycache__/agent.cpython-39.pyc new file mode 100644 index 0000000..9e4b049 Binary files /dev/null and b/common/__pycache__/agent.cpython-39.pyc differ diff --git a/common/__pycache__/renderer.cpython-310.pyc b/common/__pycache__/renderer.cpython-310.pyc new file mode 100644 index 0000000..3cf8acf Binary files /dev/null and b/common/__pycache__/renderer.cpython-310.pyc differ diff --git a/common/__pycache__/renderer.cpython-39.pyc b/common/__pycache__/renderer.cpython-39.pyc new file mode 100644 index 0000000..e04bc98 Binary files /dev/null and b/common/__pycache__/renderer.cpython-39.pyc differ diff --git a/common/__pycache__/utils.cpython-310.pyc b/common/__pycache__/utils.cpython-310.pyc new file mode 100644 index 0000000..df59760 Binary files /dev/null and b/common/__pycache__/utils.cpython-310.pyc differ diff --git a/common/__pycache__/utils.cpython-39.pyc b/common/__pycache__/utils.cpython-39.pyc new file mode 100644 index 0000000..b0accf0 Binary files /dev/null and b/common/__pycache__/utils.cpython-39.pyc differ diff --git a/common/agent.py b/common/agent.py new file mode 100644 index 0000000..5f4dfcc --- /dev/null +++ b/common/agent.py @@ -0,0 +1,540 @@ +from abc import ABC +import copy +from enum import Enum +from typing import List, Tuple +import gymnasium as gym + +class MyEnum(Enum): + @classmethod + def from_value(cls, value): + """根据数值获取相应的枚举成员""" + for member in cls: + if member.value == value: + return member + raise ValueError(f"{value} is not a valid value for {cls.__name__}") + + @classmethod + def from_string(cls, name): + """根据字符串获取相应的枚举成员""" + try: + return cls[name] + except KeyError: + raise ValueError(f"{name} is not a valid {cls.__name__}") + + @classmethod + def from_input(cls, input): + """根据输入(字符串或数值)获取相应的枚举成员""" + if isinstance(input, str): + return cls.from_string(input) + elif isinstance(input, int): + return cls.from_value(input) + else: + raise ValueError("Input must be either a string or an integer.") + +class MoveType(MyEnum): + AIR = 0 + GROUND = 1 + SEA = 2 + SUBWATER = 3 + +class TerrainType(MyEnum): + PLAIN = 0 + ROAD = 1 + SEA = 2 + SUBWATER = 3 + +class ActionType(MyEnum): + ILLEGAL = -1 + END_OF_TURN = 0 + MOVE = 1 + ATTACK = 2 + INTERACT = 3 + RELEASE = 4 + SWITCH_WEAPON = 5 + SUPPLY = 6 + +class ActionStatus(MyEnum): + VALID = 0 + OCCUPIED_DES = 1 + OUT_OF_RANGE = 2 + +class AgentType(MyEnum): + Infantry = 0 # 步兵 + Tank = 1 # 装甲单位 + AntiAir = 2 # 自行防空炮 + MobilizedInfantry = 3 # 机械化步兵 + Helicopter = 4 # 直升机 + Fighter = 5 # 战斗机 + UAV = 6 # 无人机 + CombatShip = 7 # 战舰 + Carrier = 8 # 航母 + Artillery = 9 # 火炮 + Construction = 10 # 建筑 + MissileLauncher = 11 # 导弹发射车 + TransportHelicopter = 12 # 运输直升机 + SupplyTruck = 13 # 移动补给车 + Submarine = 14 # 潜艇 + AdvancedTank = 15 # 先进坦克 + Airport = 16 # 机场 + SupplyStation = 17 # 补给站 + RadarStation = 18 # 雷达站 + Bomber = 19 # 轰炸机 + AWACS = 20 # 预警机 + TransportShip = 21 # 运输船 + CommandPost = 22 # 指挥部 + +class ModuleType(MyEnum): + HANGER = 0 + SUPPLY = 1 + TRANSPORT = 2 + +class Weapon: + def __init__(self, name: str, attack_range: int, damage: float, max_ammo: int, ammo: int = None, strike_types: List[MoveType] = []): + self.name = name + self.attack_range = attack_range + self.damage = damage + self.max_ammo = max_ammo + self.ammo = ammo if ammo is not None else max_ammo + self.strike_types = strike_types + + def reset(self): + self.ammo = self.max_ammo + + def to_dict(self): + return { + "name": self.name, + "attack_range": self.attack_range, + "damage": self.damage, + "max_ammo": self.max_ammo, + "ammo": self.ammo, + "strike_types": [strike_type.name for strike_type in self.strike_types] + } + + def __deepcopy__(self, memo): + copied_strike_types = copy.deepcopy(self.strike_types, memo) + return Weapon(self.name, self.attack_range, self.damage, self.max_ammo, self.ammo, copied_strike_types) + +class Action: + def __init__(self, + action_type: ActionType=ActionType.ILLEGAL, + agent_id: str = "", + des: Tuple[int, int] = (-1, -1), + target_id: str = "", + weapon_id: int = 0, + weapon_name: str = "", + start_time: int = 0, + end_time: int = -1, + state: 'AgentState' = None, + **kwargs): + self.agent_id = agent_id + self.target_id = target_id + self.des = des + self.action_type = action_type + self.weapon_id = weapon_id + self.weapon_name = weapon_name + self.start_time = start_time + self.end_time = start_time if end_time == -1 else end_time + self.end_type = None + self.state = state + for key, value in kwargs.items(): + setattr(self, key, value) + + def json(self) -> dict: + from .utils import axial_to_cube_dict + return { + "curOrderedUnitID": self.agent_id, + "targetUnitID": self.target_id, + "destination": axial_to_cube_dict((self.des[0]-1000, self.des[1]-1000)), + "commandType": self.action_type.value + } + + def __str__(self): + if self.action_type == ActionType.MOVE: + return f"{self.agent_id} moves to {self.des}" + elif self.action_type == ActionType.ATTACK: + return f"{self.agent_id} attacks {self.target_id}" + elif self.action_type == ActionType.INTERACT: + return f"Interact {self.agent_id} with {self.target_id}" + elif self.action_type == ActionType.RELEASE: + return f"Release {self.target_id} from {self.agent_id} to {self.des}" + elif self.action_type == ActionType.END_OF_TURN: + return f"End of turn" + elif self.action_type == ActionType.SWITCH_WEAPON: + return f"{self.agent_id} switches to {self.weapon_name}" + else: + return "Unknown action" + + def to_dict(self): + return { + "agent_id": self.agent_id, + "target_id": self.target_id, + "des": self.des, + "action_type": self.action_type.name, + "weapon_id": self.weapon_id, + "weapon_name": self.weapon_name, + "start_time": self.start_time, + "end_type": self.end_type + } + +class Command(Action): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def description(self) -> List[str]: + if self.command.action_type == ActionType.MOVE: + return [ + f"Agent {self.agent_id} started to move to {self.command.des} at time {self.start_time}.", + f"Agent {self.agent_id} arrived at {self.command.des} at time {self.end_time}." + ] + elif self.command.action_type == ActionType.ATTACK: + return [ + f"Agent {self.agent_id} started to attack {self.command.des} at time {self.start_time}.", + f"Agent {self.agent_id} attacked {self.command.des} at time {self.end_time}." + ] + elif self.command.action_type == ActionType.SUPPLY: + return [ + f"Agent {self.agent_id} started to get supply from {self.command.des} at time {self.start_time}.", + f"Agent {self.agent_id} got supply from {self.command.des} at time {self.end_time}." + ] + elif self.command.action_type == ActionType.SWITCH_WEAPON: + return [ + f"Agent {self.agent_id} started to switch weapon to {self.command.weapon_name} at time {self.start_time}.", + f"Agent {self.agent_id} switched weapon to {self.command.weapon_name} at time {self.end_time}." + ] + else: + raise NotImplementedError(f"Unsupported command type: {self.command.action_type}") + +class Agent: + def __init__(self, + agent_id: str, + agent_type: AgentType, + team_id: int, + faction_id: int, + move_type: MoveType, + pos: Tuple[int, int], + info_level: int, stealth_level: int, + max_endurance: int, defense: int, + max_fuel: float, mobility: float, + switchable_weapons = [], + modules = [], + is_key: bool = False): + self.agent_id = agent_id + self.agent_type = agent_type + self.team_id = team_id + self.faction_id = faction_id + self.move_type = MoveType(move_type) + self.init_pos = pos + self.info_level = info_level + self.stealth_level = stealth_level + self.max_endurance = max_endurance + self.defense = defense + self.endurance = max_endurance + self.max_fuel = max_fuel + self.mobility = mobility + self.switchable_weapons = switchable_weapons + self.modules = modules + self.is_key = is_key + + # Get action space + from .utils import range_to_count + max_attack_range = max(weapon.attack_range for weapon in self.switchable_weapons) if self.switchable_weapons else 0 + max_capacity = max(module.capacity if hasattr(module, "capacity") else 0 for module in self.modules) if self.modules else 0 + # max_capacity = 6 if self.modules and any(hasattr(module, "parked_agents") for module in self.modules) else 0 + self._action_space = gym.spaces.Dict( + { + action_type: space + for action_type, space in { + ActionType.MOVE: gym.spaces.Discrete(range_to_count(int(self.mobility))) \ + if self.mobility > 0 else None, + ActionType.ATTACK: gym.spaces.Discrete(range_to_count(max_attack_range)) \ + if max_attack_range > 0 else None, + ActionType.SWITCH_WEAPON: gym.spaces.Discrete(len(self.switchable_weapons)) \ + if len(self.switchable_weapons) >= 1 else None, + ActionType.INTERACT: gym.spaces.Discrete(range_to_count(1)), + ActionType.RELEASE: gym.spaces.Discrete(max_capacity) \ + if max_capacity > 0 else None, + }.items() + if space is not None # 过滤掉值为 None 的键值对 + } + ) + + self._has_supply = False + self._available_types = [] + self._parked_agents = [] + self._capacity = max_capacity # 实际上应该最多只有一个运输单元 + + for module in self.modules: + if module.module_type == ModuleType.SUPPLY: + self._has_supply = True + self.supply = module + elif module.module_type == ModuleType.HANGER: + self._parked_agents = module.parked_agents + elif module.module_type == ModuleType.TRANSPORT: + self._parked_agents = module.parked_agents + self._available_types.extend(module.available_types) + + self.reset() + + def reset(self): + self.pos = self.init_pos + self.endurance = self.max_endurance + self.fuel = self.max_fuel + if self.switchable_weapons: + for weapon in self.switchable_weapons: + weapon.reset() + self.weapon = self.switchable_weapons[0] + else: + self.weapon = None + self.commenced_action = False + self.cmd_todo = [] + self.todo = [] + self.is_carried = False + + def __lt__(self, other): + return self.agent_id < other.agent_id + + def __eq__(self, other): + return self.agent_id == other.agent_id + + def __hash__(self): + return hash(self.agent_id) + + def __str__(self): + return f"{self.agent_id} ({self.move_type.name}) at {self.pos}" + + def __repr__(self): + return f"{self.agent_id} ({self.move_type.name}) at {self.pos}" + + def to_dict(self): + return { + "agent_id": self.agent_id, + "agent_type": self.agent_type.name, + "team_id": self.team_id, + "faction_id": self.faction_id, + "move_type": self.move_type.name, + # "info_level": self.info_level, + # "stealth_level": self.stealth_level, + "defense": self.defense, + "max_endurance": self.max_endurance, + "max_fuel": self.max_fuel, + "switchable_weapons": [weapon.to_dict() for weapon in self.switchable_weapons], + "modules": [module.to_dict() for module in self.modules], + # 以下为可变属性 + "pos": list(self.pos), + "endurance": self.endurance, + "fuel": self.fuel, + "mobility": self.mobility, + "weapon": self.weapon.to_dict() if self.weapon is not None else None + } + + def plan_to_dict(self): + state = self.todo[-1].state if self.todo else self.state + return { + "agent_id": self.agent_id, + "agent_type": self.agent_type.name, + "move_type": self.move_type.name, + "defense": self.defense, + "max_endurance": self.max_endurance, + "max_fuel": self.max_fuel, + "switchable_weapons": [weapon.to_dict() for weapon in self.switchable_weapons], + "modules": [module.to_dict() for module in self.modules], + "pos": list(state.pos), + "endurance": self.endurance, + "fuel": self.fuel, + "mobility": self.mobility, + "weapon": self.weapon.to_dict() if self.weapon is not None else None, + } + + @property + def alive(self): + return self.endurance > 0 + + @property + def attack_range(self): + return self.weapon.attack_range if self.weapon is not None else 0 + + @property + def strike_types(self): + return self.weapon.strike_types if self.weapon is not None else [] + + @property + def damage(self): + return self.weapon.damage if self.weapon is not None else 0 + + @property + def ammo(self): + return self.weapon.ammo if self.weapon is not None else 0 + + @property + def action_space(self): + return self._action_space + + @property + def has_supply(self): + return self._has_supply + + @property + def available_types(self): + return self._available_types + + @property + def parked_agents(self): + return self._parked_agents + + @property + def capacity(self): + return self._capacity + + @property + def state(self): + return AgentState(self.pos, self.endurance, self.fuel, self.weapon, self.commenced_action, self.cmd_todo, self.todo) + + def update(self, state: 'AgentState'): + self.pos = state.pos + self.endurance = state.endurance + self.fuel = state.fuel + self.commenced_action = state.commenced_action + self.cmd_todo = copy.deepcopy(state.cmd_todo) + self.todo = copy.deepcopy(state.todo) + self.weapon = copy.deepcopy(state.weapon) + +class AgentState: + def __init__(self, pos: Tuple[int, int], endurance: int, fuel: float, weapon: Weapon = None, commenced_action: bool = False, cmd_todo: List[Action] = [], todo: List[Action] = []): + self.pos = pos + self.endurance = endurance + self.fuel = fuel + self.commenced_action = commenced_action + self.weapon = copy.deepcopy(weapon) if weapon is not None else None + self.cmd_todo = copy.deepcopy(cmd_todo) + self.todo = copy.deepcopy(todo) + +class Team: + def __init__(self, team_id: int, faction_id: int, agents: dict[str, Agent] = {}): + self.team_id = team_id + self.faction_id = faction_id + self.agents = agents + self.reset() + + def __lt__(self, other): + return self.team_id < other.team_id + + def __eq__(self, other): + return self.team_id == other.team_id + + def __hash__(self): + return hash(self.team_id) + + def __str__(self): + return f"Team {self.team_id}" + + def __repr__(self): + return f"Team {self.team_id}" + + def add_agent(self, agent: Agent): + self.agents.append(agent) + + def remove_agent(self, agent_id: str): + self.agents.pop(agent_id) + + def reset(self): + for agent in self.agents.values(): + agent.reset() + + @property + def alive_count(self): + return sum(1 for agent in self.agents.values() if agent.alive) + + @property + def alive_ids(self): + return [agent.agent_id for agent in self.agents.values() if agent.alive] + +class TileNode: + def __init__(self, pos: Tuple[int, int], terrain_type: TerrainType, agent_id: str = None, is_city: bool = False): + self.pos = pos + self.terrain_type = TerrainType(terrain_type) + self.agent_id = agent_id + self.is_city = is_city + self.team_id = -1 + + def reset(self): + self.chaotic_value = 0 + self.occupy_value = 0 + self.occupied_by = None + self.agent_id = None + self.team_id = -1 + + def __lt__(self, other): + return self.pos < other.pos + + def __eq__(self, other): + return self.pos == other.pos + + def __hash__(self): + return hash(self.pos) + + def __str__(self): + return f"{self.pos} ({self.terrain_type.name})" + + def __repr__(self): + return f"{self.pos} ({self.terrain_type.name})" + + +class Map: + def __init__(self, nodes: dict[Tuple[int, int], TileNode]): + self.width = max(pos[0] for pos in nodes.keys()) + 1 + self.height = max(pos[1] for pos in nodes.keys()) + 1 + self.nodes = nodes + +class Module(ABC): + def __init__(self, module_type: ModuleType, add_endurance: int, add_ammo: int, add_fuel: float, available_types: List[AgentType] = None, capacity: int = 0): + self.module_type = module_type + self.add_endurance = add_endurance + self.add_ammo = add_ammo + self.add_fuel = add_fuel + self.available_types = available_types if available_types is not None else [] + self.capacity = capacity + + def to_dict(self): + return { + "module_type": self.module_type.name, + "add_endurance": self.add_endurance, + "add_ammo": self.add_ammo, + "add_fuel": self.add_fuel, + "available_types": [agent_type.name for agent_type in self.available_types] + } + +class Hanger(Module): + def __init__(self, available_types: List[AgentType] = None, capacity: int = 6): + super().__init__(module_type=ModuleType.HANGER, + add_endurance=4, + add_ammo=-1, + add_fuel=-1, + available_types=available_types, + capacity=capacity) + self.parked_agents = [] + + def reset(self): + self.parked_agents = [] + +class Supply(Module): + def __init__(self, available_types: List[AgentType] = None, capacity: int = 0): + super().__init__(module_type=ModuleType.SUPPLY, + add_endurance=2, + add_ammo=2, + add_fuel=-1, + available_types=available_types, + capacity=capacity) + +class Transport(Module): + def __init__(self, available_types: List[AgentType] = None, capacity: int = 6): + super().__init__(module_type=ModuleType.TRANSPORT, + add_endurance=0, + add_ammo=0, + add_fuel=0, + available_types=available_types, + capacity=capacity) + self.parked_agents = [] + + def reset(self): + self.parked_agents = [] + diff --git a/common/renderer.py b/common/renderer.py new file mode 100644 index 0000000..0d045e2 --- /dev/null +++ b/common/renderer.py @@ -0,0 +1,155 @@ +import numpy as np +import pygame +import math +from .utils import * +from .agent import TileNode, Agent, AgentType + +terrain_colors = [ + (245, 245, 220), # 米色 Plain + (210, 180, 140), # 褐色 Road + (224, 255, 255), # 青色 Water + (173, 216, 230) # 蓝色 Subwater +] + +team_colors = [ + (255, 0, 0), # 红色 + (0, 0, 255), # 蓝色 + (0, 255, 0), # 绿色 + (255, 255, 0), # 黄色 + (255, 165, 0), # 橙色 + (128, 0, 128), # 紫色 + (0, 255, 255), # 青色 + (255, 192, 203) # 粉色 +] + +icons_path = "./tianqiong/envs/icons" + +unit_icons: Dict[str, Dict[int, pygame.Surface]] = { + AgentType.AntiAir: {0: pygame.image.load(f"{icons_path}/AntiAir_b.png"), 1: pygame.image.load(f"{icons_path}/AntiAir_r.png")}, + AgentType.Airport: {0: pygame.image.load(f"{icons_path}/Airport_b.png"), 1: pygame.image.load(f"{icons_path}/Airport_r.png")}, + AgentType.Artillery: {0: pygame.image.load(f"{icons_path}/Artillery_b.png"), 1: pygame.image.load(f"{icons_path}/Artillery_r.png")}, + AgentType.Bomber: {0: pygame.image.load(f"{icons_path}/Bomber_b.png"), 1: pygame.image.load(f"{icons_path}/Bomber_r.png")}, + AgentType.Carrier: {0: pygame.image.load(f"{icons_path}/Carrier_b.png"), 1: pygame.image.load(f"{icons_path}/Carrier_r.png")}, + AgentType.Fighter: {0: pygame.image.load(f"{icons_path}/Fighter_b.png"), 1: pygame.image.load(f"{icons_path}/Fighter_r.png")}, + AgentType.Helicopter: {0: pygame.image.load(f"{icons_path}/Helicopter_b.png"), 1: pygame.image.load(f"{icons_path}/Helicopter_r.png")}, + AgentType.Infantry: {0: pygame.image.load(f"{icons_path}/Infantry_b.png"), 1: pygame.image.load(f"{icons_path}/Infantry_r.png")}, + AgentType.Tank: {0: pygame.image.load(f"{icons_path}/Tank_b.png"), 1: pygame.image.load(f"{icons_path}/Tank_r.png")}, + AgentType.TransportHelicopter: {0: pygame.image.load(f"{icons_path}/TransportHelicopter_b.png"), 1: pygame.image.load(f"{icons_path}/TransportHelicopter_r.png")}, + AgentType.TransportShip: {0: pygame.image.load(f"{icons_path}/TransportShip_b.png"), 1: pygame.image.load(f"{icons_path}/TransportShip_r.png")}, + AgentType.CombatShip: {0: pygame.image.load(f"{icons_path}/CombatShip_b.png"), 1: pygame.image.load(f"{icons_path}/CombatShip_r.png")}, +} + +class Renderer: + def __init__(self, nodes: List[TileNode], hex_size=20): + self.nodes = nodes + self.hex_size = hex_size + self.window_size, (self.offset_x, self.offset_y) = get_window_size_and_offsets(nodes, hex_size) + + def render(self, player_agents: List[Agent], spotted_enemy_agents: List[Agent], attack_agent: Agent=None, defend_agent: Agent=None) -> np.ndarray: + screen = pygame.Surface(self.window_size) + offset_x = self.offset_x + offset_y = self.offset_y + hex_size = self.hex_size + nodes = self.nodes + draw_hex_grid(screen, offset_x, offset_y, hex_size, nodes) + for agent in player_agents: + pos = agent.pos + alpha = agent.endurance / agent.max_endurance * 255 + draw_unit(screen, offset_x, offset_y, hex_size, axial_to_pixel(pos, hex_size), alpha=alpha, agent_type=agent.agent_type, team_id=agent.team_id) + for agent in spotted_enemy_agents: + pos = agent.pos + alpha = agent.endurance / agent.max_endurance * 255 + draw_unit(screen, offset_x, offset_y, hex_size, axial_to_pixel(pos, hex_size), alpha=alpha, agent_type=agent.agent_type, team_id=agent.team_id) + + if attack_agent and defend_agent: + start_pos = axial_to_pixel(attack_agent.pos, hex_size) + end_pos = axial_to_pixel(defend_agent.pos, hex_size) + draw_hexagon(screen, hex_size, (start_pos[0] + offset_x, start_pos[1] + offset_y), team_colors[attack_agent.team_id], True, 5) + draw_hexagon(screen, hex_size, (end_pos[0] + offset_x, end_pos[1] + offset_y), team_colors[attack_agent.team_id], True, 5) + + rgb_array = pygame.surfarray.array3d(screen) + + return np.transpose(rgb_array, (1, 0, 2)) + +# 轴坐标系到像素坐标的转换函数(尖角朝上) +def axial_to_pixel(pos: Tuple[int, int], hex_size): + q, r = pos + x = hex_size * math.sqrt(3) * q + y = hex_size * 3/2 * r + x += hex_size * math.sqrt(3) / 2 * r + return x, y + +# 绘制六边形的函数 +def draw_hexagon(surface, hex_size, center, color=(255, 255, 255), only_frame=False, line_width=1): + line_color = (169, 169, 169) # 灰色 + angle_offset = math.pi / 6 # 顶点指向上方 + points = [ + ( + center[0] + hex_size * math.cos(angle_offset + math.pi / 3 * i), + center[1] + hex_size * math.sin(angle_offset + math.pi / 3 * i) + ) + for i in range(6) + ] + if not only_frame: + pygame.draw.polygon(surface, color, points) + pygame.draw.polygon(surface, line_color, points, width=1) # 绘制六边形边框 + else: + pygame.draw.polygon(surface, color, points, width=line_width) # 绘制六边形边框 + +def get_window_size_and_offsets(nodes: List[TileNode], hex_size: int) -> Tuple[Tuple[int, int], Tuple[int, int]]: + min_x = min_y = float('inf') + max_x = max_y = float('-inf') + offset_x = offset_y = float('-inf') + + for node in nodes: + q, r = node.pos + x, y = axial_to_pixel((q, r), hex_size) + min_x = min(min_x, x) + max_x = max(max_x, x) + min_y = min(min_y, y) + max_y = max(max_y, y) + offset_x = max(offset_x, -x) + offset_y = max(offset_y, -y) + + # 加一些边距 + margin = hex_size * 2 + offset_x += margin + offset_y += margin + width = int(max_x - min_x + 2 * margin) + height = int(max_y - min_y + 2 * margin) + + width = (width - 15) // 16 * 16 + 16 + height = (height - 15) // 16 * 16 + 16 + + return (width, height), (int(offset_x), int(offset_y)) + +# 绘制单位的函数,带透明度 +def draw_unit(surface, offset_x, offset_y, hex_size, center, alpha=128, agent_type=None, team_id=None): + # print(agent_type, team_id) + if agent_type in unit_icons and team_id in unit_icons[agent_type]: + # 如果有图标,用图标表示 + icon = unit_icons[agent_type][team_id] + icon_size = hex_size * 1.2 + icon = pygame.transform.scale(icon, (icon_size, icon_size)) # 缩放图标至合适尺寸 + icon.set_alpha(alpha) # 设置透明度 + surface.blit(icon, (center[0] + offset_x - icon_size // 2, center[1] + offset_y - icon_size // 2)) + else: + # 如果没有图标,用圆圈表示单位 + unit_surface = pygame.Surface((hex_size, hex_size), pygame.SRCALPHA) + unit_surface.set_alpha(alpha) # 设置透明度 + color = team_colors[team_id] + pygame.draw.circle(unit_surface, color, (hex_size // 2, hex_size // 2), hex_size // 3) + surface.blit(unit_surface, (center[0] + offset_x - hex_size // 2, center[1] + offset_y - hex_size // 2)) + +# 主绘制功能 +def draw_hex_grid(surface, offset_x, offset_y, hex_size, nodes: List[TileNode]): + background_color = (255, 255, 255) # 白色 + surface.fill(background_color) + + for node in nodes: + q, r = node.pos + color = terrain_colors[node.terrain_type.value] + pixel_x, pixel_y = axial_to_pixel((q, r), hex_size) + center = (pixel_x + offset_x, pixel_y + offset_y) + draw_hexagon(surface, hex_size, center, color) + diff --git a/common/utils.py b/common/utils.py new file mode 100644 index 0000000..41864ea --- /dev/null +++ b/common/utils.py @@ -0,0 +1,301 @@ +import heapq +from .agent import Agent, Weapon, Action, TileNode, Module, Supply, Hanger, Transport +from .agent import MoveType, TerrainType, ActionType, AgentType, ModuleType +from typing import Tuple, List, Dict, Union +import numpy as np + +# Define the cost matrix for agent types and terrain types +cost_matrix = [ + # Plain Road Sea Hill + [ 1, 1, 1, 1], # Air + [ 1.5, 1, 0, 2], # Ground + [ 0, 0, 1, 0], # Sea + [ 0, 0, 1, 0], # Subwater +] + +def get_cost(move_type: MoveType, terrain_type: TerrainType): + return cost_matrix[move_type.value][terrain_type.value] + +def get_axial_dis(pos1: Tuple[int, int], pos2: Tuple[int, int] = (0, 0)): + q1, r1 = pos1 + q2, r2 = pos2 + return (abs(q1 - q2) + abs(r1 - r2) + abs((q1 + r1) - (q2 + r2))) / 2 + +def astar_search(map_data: Dict[Tuple[int, int], object], + move_type: MoveType, + start: Tuple[int, int], + goal: Tuple[int, int] = None, + limit: float = float('inf'), + return_cost: bool = False) -> Union[List[Tuple[int, int]], Tuple[float, List[Tuple[int, int]]]]: + """ + A* search algorithm to find the shortest path from start to goal. + map_data: a dictionary of nodes, where the keys are the node coordinates and the values are the node objects + move_type: the type of agent to consider (0 for air, 1 for ground, 2 for sea, 3 for subwater) + start: the starting node + goal: the ending node + """ + if isinstance(start, tuple): + start = [start] + multi_source = False + elif isinstance(start, list): + multi_source = True + else: + raise ValueError(f"Invalid start type. Found{type(start)} but expected tuple or list.") + + g = {} + f = {} + parent = {} + open_list = [] + cand_parent_count = {} + + for pos in start: + g[pos] = 0 + f[pos] = 0 + parent[pos] = None + cand_parent_count[pos] = 1 + open_list.append((0.0, pos)) + + in_open_list = set(start) + heapq.heapify(open_list) + closed_list = set() + + directions = [(1, 0), (1, -1), (0, -1), (-1, 0), (-1, 1), (0, 1)] + + while open_list: + _, cur = heapq.heappop(open_list) + in_open_list.remove(cur) + + if goal and cur == goal: + path = [] + while cur not in start: + path.append(cur) + cur = parent[cur] + if multi_source: + if path: + path.pop(0) # 弹出实际上当前所在位置 + path.append(cur) # 确定多源最短路的终点 + else: + path.reverse() + return (cost, path) if return_cost else path + + closed_list.add(cur) + + for dq, dr in directions: + neighbor = (cur[0] + dq, cur[1] + dr) + if neighbor in map_data and neighbor not in closed_list: + cost = get_cost(move_type, map_data[neighbor].terrain_type) \ + if not multi_source \ + else get_cost(move_type, map_data[cur].terrain_type) + if cost == 0: + continue + tentative_g = g[cur] + cost + if neighbor not in g or tentative_g < g[neighbor]: + g[neighbor] = tentative_g + f[neighbor] = tentative_g + (get_axial_dis(neighbor, goal) if goal else 0) + if f[neighbor] > limit: + continue + parent[neighbor] = cur + cand_parent_count[neighbor] = 1 + if neighbor not in in_open_list: + heapq.heappush(open_list, (f[neighbor], neighbor)) + in_open_list.add(neighbor) + elif neighbor in g and tentative_g == g[neighbor] and f[neighbor] <= limit: + cand_parent_count[neighbor] += 1 + if np.random.random() < 1 / cand_parent_count[neighbor]: # reservior sampling + parent[neighbor] = cur + + if goal is not None: + raise ValueError("No path found") + + return list(closed_list) + +def get_path(agent, map_data, start, des: Union[Tuple[int, int], List[Tuple[int, int]]], limit=float('inf')) -> List[Tuple[int, int]]: + """ + Returns a list of move actions to move the agent from its current position to the destination. + """ + if isinstance(des, tuple): + raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=start, goal=des, limit=limit) + elif isinstance(des, list): # multi-source + raw_path = astar_search(map_data=map_data, move_type=agent.move_type, start=des, goal=start, limit=limit) + else: + raise ValueError(f"Invalid destination type. Found{type(des)} but expected tuple or list.") + + if not raw_path: + raise ValueError("No path found") + + # Create the frame path by breaking the path into smaller moves + path = [] + sum = 0 + raw_path = [start] + raw_path + + # print(agent.pos, des, raw_path) + + for parent, node in zip(raw_path[:-1], raw_path[1:]): + cost = get_cost(agent.move_type, map_data[node].terrain_type) + sum += cost + if sum > agent.fuel: + raise ValueError(f"Not enough fuel to reach {des}") + if sum > agent.mobility: + sum = cost + path.append(parent) + path.append(node) + return path + +def axial_to_cube_dict(a): + q, r = a + return { + "x": q, + "y": -(q + r), + "z": r + } + +def cube_dict_to_axial(c): + return (c["x"], c["z"]) + +def axial_to_cube(a): + q, r = a + return (q, -(q + r), r) + +def cube_to_axial(c): + return (c[0], c[2]) + +def encode_axial(pos: Tuple[int, int]) -> int: + q, r = pos + if q == 0 and r == 0: + return 0 + + x, y, z = axial_to_cube(pos) + d = get_axial_dis(pos) + + base_number = 3 * d * (d - 1) + 1 + + if x > 0 and y < 0 and z >= 0: # 0 + return base_number + z + elif x <= 0 and y < 0 and z > 0: # 1 + return base_number + d - x + elif x < 0 and y >= 0 and z > 0: # 2 + return base_number + d * 2 + y + elif x < 0 and y > 0 and z <= 0: # 3 + return base_number + d * 3 - z + elif x >= 0 and y > 0 and z < 0: # 4 + return base_number + d * 4 + x + else: # x > 0 and y <= 0 and z < 0 # 5 + return base_number + d * 5 - y + +def decode_axial(num: int) -> Tuple[int, int]: + if num == 0: + return (0, 0) + + l, r = 1, num + while l < r: + mid = (l + r) // 2 + if range_to_count(mid) + 1 > num: + r = mid + else: + l = mid + 1 + d = l + base_number = range_to_count(d - 1) + 1 + # base_number = 1 + # d = 1 + # while base_number + 6 * d <= num: + # base_number += 6 * d + # d += 1 + + k = (num - base_number) // d + mod = (num - base_number) % d + + if k == 0: + q = d - mod + r = mod + elif k == 1: + q = -mod + r = d + elif k == 2: + q = -d + r = d - mod + elif k == 3: + q = -(d - mod) + r = -mod + elif k == 4: + q = mod + r = -d + else: # k == 5 + q = d + r = -(d - mod) + + return (q, r) + +def get_adj_pos(pos: Tuple[int, int], max_dis: int) -> List[Tuple[int, int]]: + """ + Returns a list of adjacent positons within the given range. + """ + d = [decode_axial(num) for num in range(1, range_to_count(int(max_dis)) + 1)] + return [(pos[0] + dq, pos[1] + dr) for dq, dr in d] + +def range_to_count(max_dis: int) -> int: # not including 0 + return 3 * max_dis * (max_dis + 1) + +def dict_to_action(action_dict: dict) -> Action: + return Action( + action_type=ActionType.from_input(action_dict["action_type"]), + agent_id=action_dict["agent_id"], + target_id=action_dict.get("target_id", ""), + des=action_dict.get("des", (-1, -1)), + weapon_id=action_dict.get("weapon_id", -1), + weapon_name=action_dict.get("weapon_name", ""), + start_time=action_dict.get("start_time", 0), + end_type=action_dict.get("end_type", None), + attack_count=action_dict.get("attack_count", 0), + ) + +def dict_to_node(tile_dict: dict) -> TileNode: + return TileNode( + pos=(tile_dict["pos"]["q"], tile_dict["pos"]["r"]), + terrain_type=TerrainType.from_input(tile_dict["terrain_type"]), + # agent_id=tile_dict.get("agent_id", None), + # is_city=tile_dict.get("is_city", False), + # chaotic_value=tile_dict.get("chaotic_value", 0), + # occupy_value=tile_dict.get("occupy_value", 0), + # occupied_by=tile_dict.get("occupied_by", None) + ) + +def dict_to_weapon(weapon_dict: dict) -> Weapon: + return Weapon( + name=weapon_dict["name"], + attack_range=weapon_dict["attack_range"], + damage=weapon_dict["damage"], + max_ammo=weapon_dict["max_ammo"], + strike_types=[MoveType.from_input(strike_type) for strike_type in weapon_dict["strike_types"]] + ) + +def dict_to_module(module_dict: dict) -> Module: + module_type = ModuleType.from_input(module_dict["module_id"]) + available_types = [AgentType.from_input(agent_type) for agent_type in module_dict.get("available_types", [])] \ + if module_dict.get("available_types", []) is not None else [] + capacity = module_dict.get("capacity", 0) + if module_type == ModuleType.HANGER: + return Hanger(available_types=available_types, capacity=capacity) + elif module_type == ModuleType.SUPPLY: + return Supply(available_types=available_types, capacity=capacity) + elif module_type == ModuleType.TRANSPORT: + return Transport(available_types=available_types, capacity=capacity) + else: + raise ValueError(f"Invalid module type: {module_type}") + +def dict_to_agent(agent_dict: dict) -> Agent: + return Agent( + agent_id=agent_dict["agent_id"], + agent_type=AgentType.from_input(agent_dict["type"]), + team_id=agent_dict["team_id"], + faction_id=agent_dict["faction_id"], + move_type=MoveType.from_input(agent_dict["move_type"]), + pos=(agent_dict["pos"]["q"], agent_dict["pos"]["r"]), + info_level=agent_dict.get("info_level", 0), + stealth_level=agent_dict.get("stealth_level", 0), + max_endurance=agent_dict["max_endurance"], + defense=agent_dict["defense"], + max_fuel=agent_dict["max_fuel"], + mobility=agent_dict["mobility"], + switchable_weapons=[dict_to_weapon(weapon_dict) for weapon_dict in agent_dict.get("switchable_weapons", [])], + modules=[dict_to_module(module_dict) for module_dict in agent_dict.get("modules", [])] if agent_dict.get("modules", []) is not None else [] + ) \ No newline at end of file diff --git a/data/test/AgentsInfo.json b/data/test/AgentsInfo.json new file mode 100644 index 0000000..2b5126b --- /dev/null +++ b/data/test/AgentsInfo.json @@ -0,0 +1,603 @@ +[ + { + "agent_id": "879a", + "pos": { + "q": 10, + "r": 0 + }, + "max_endurance": 10.0, + "max_fuel": 0.0, + "type": 16, + "team_id": 0, + "faction_id": 0, + "move_type": 1, + "defense": 5.0, + "mobility": 0.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [], + "modules": [ + { + "module_id": 0, + "capacity": 6, + "available_types": [ + 4, + 5, + 6, + 19, + 20 + ] + }, + { + "module_id": 1 + } + ] + }, + { + "agent_id": "9f19", + "pos": { + "q": 11, + "r": 0 + }, + "max_endurance": 4.0, + "max_fuel": 120.0, + "type": 5, + "team_id": 0, + "faction_id": 0, + "move_type": 0, + "defense": 1.0, + "mobility": 6.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [ + { + "name": "AKD-21\u53cd\u5766\u514b\u5bfc\u5f39", + "attack_range": 6, + "damage": 8.0, + "max_ammo": 6, + "strike_types": [ + 1 + ] + }, + { + "name": "LS-6\u5236\u5bfc\u70b8\u5f39", + "attack_range": 5, + "damage": 15.0, + "max_ammo": 1, + "strike_types": [ + 1 + ] + }, + { + "name": "PL-15\u7a7a\u7a7a\u5bfc\u5f39", + "attack_range": 8, + "damage": 7.0, + "max_ammo": 4, + "strike_types": [ + 0 + ] + }, + { + "name": "YJ-91\u7a7a\u8230\u5bfc\u5f39", + "attack_range": 6, + "damage": 15.0, + "max_ammo": 2, + "strike_types": [ + 2 + ] + } + ] + }, + { + "agent_id": "a947", + "pos": { + "q": 12, + "r": 0 + }, + "max_endurance": 4.0, + "max_fuel": 120.0, + "type": 5, + "team_id": 0, + "faction_id": 0, + "move_type": 0, + "defense": 1.0, + "mobility": 6.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [ + { + "name": "AKD-21\u53cd\u5766\u514b\u5bfc\u5f39", + "attack_range": 50, + "damage": 8.0, + "max_ammo": 6, + "strike_types": [ + 1 + ] + }, + { + "name": "LS-6\u5236\u5bfc\u70b8\u5f39", + "attack_range": 5, + "damage": 15.0, + "max_ammo": 1, + "strike_types": [ + 1 + ] + }, + { + "name": "PL-15\u7a7a\u7a7a\u5bfc\u5f39", + "attack_range": 8, + "damage": 7.0, + "max_ammo": 4, + "strike_types": [ + 0 + ] + }, + { + "name": "YJ-91\u7a7a\u8230\u5bfc\u5f39", + "attack_range": 6, + "damage": 15.0, + "max_ammo": 2, + "strike_types": [ + 2 + ] + } + ] + }, + { + "agent_id": "cc4c", + "pos": { + "q": 13, + "r": 0 + }, + "max_endurance": 4.0, + "max_fuel": 120.0, + "type": 5, + "team_id": 1, + "faction_id": 0, + "move_type": 0, + "defense": 1.0, + "mobility": 6.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [ + { + "name": "AKD-21\u53cd\u5766\u514b\u5bfc\u5f39", + "attack_range": 6, + "damage": 8.0, + "max_ammo": 6, + "strike_types": [ + 1 + ] + }, + { + "name": "LS-6\u5236\u5bfc\u70b8\u5f39", + "attack_range": 5, + "damage": 15.0, + "max_ammo": 1, + "strike_types": [ + 1 + ] + }, + { + "name": "PL-15\u7a7a\u7a7a\u5bfc\u5f39", + "attack_range": 8, + "damage": 7.0, + "max_ammo": 4, + "strike_types": [ + 0 + ] + }, + { + "name": "YJ-91\u7a7a\u8230\u5bfc\u5f39", + "attack_range": 6, + "damage": 15.0, + "max_ammo": 2, + "strike_types": [ + 2 + ] + } + ] + }, + { + "agent_id": "c865", + "pos": { + "q": 14, + "r": 0 + }, + "max_endurance": 4.0, + "max_fuel": 120.0, + "type": 5, + "team_id": 1, + "faction_id": 0, + "move_type": 0, + "defense": 1.0, + "mobility": 6.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [ + { + "name": "AKD-21\u53cd\u5766\u514b\u5bfc\u5f39", + "attack_range": 6, + "damage": 8.0, + "max_ammo": 6, + "strike_types": [ + 1 + ] + }, + { + "name": "LS-6\u5236\u5bfc\u70b8\u5f39", + "attack_range": 5, + "damage": 15.0, + "max_ammo": 1, + "strike_types": [ + 1 + ] + }, + { + "name": "PL-15\u7a7a\u7a7a\u5bfc\u5f39", + "attack_range": 8, + "damage": 7.0, + "max_ammo": 4, + "strike_types": [ + 0 + ] + }, + { + "name": "YJ-91\u7a7a\u8230\u5bfc\u5f39", + "attack_range": 6, + "damage": 15.0, + "max_ammo": 2, + "strike_types": [ + 2 + ] + } + ] + }, + { + "agent_id": "4189", + "pos": { + "q": 22, + "r": 0 + }, + "max_endurance": 10.0, + "max_fuel": 120.0, + "type": 1, + "team_id": 1, + "faction_id": 0, + "move_type": 1, + "defense": 3.0, + "mobility": 4.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [] + }, + { + "agent_id": "0cd5", + "pos": { + "q": 23, + "r": 0 + }, + "max_endurance": 10.0, + "max_fuel": 120.0, + "type": 1, + "team_id": 1, + "faction_id": 0, + "move_type": 1, + "defense": 3.0, + "mobility": 4.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [] + }, + { + "agent_id": "2602", + "pos": { + "q": 24, + "r": 0 + }, + "max_endurance": 3.0, + "max_fuel": 80.0, + "type": 12, + "team_id": 2, + "faction_id": 1, + "move_type": 0, + "defense": 0.0, + "mobility": 5.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [], + "switchable_weaponsNames": [] + }, + { + "agent_id": "c53f", + "pos": { + "q": 2, + "r": 14 + }, + "max_endurance": 10.0, + "max_fuel": 0.0, + "type": 16, + "team_id": 2, + "faction_id": 1, + "defense": 5.0, + "mobility": 0.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [], + "switchable_weaponsNames": [] + }, + { + "agent_id": "95f3", + "pos": { + "q": 3, + "r": 14 + }, + "max_endurance": 4.0, + "max_fuel": 120.0, + "type": 5, + "team_id": 2, + "faction_id": 1, + "move_type": 0, + "defense": 1.0, + "mobility": 6.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [ + { + "name": "AKD-21\u53cd\u5766\u514b\u5bfc\u5f39", + "attack_range": 6, + "damage": 8.0, + "max_ammo": 6, + "strike_types": [ + 1 + ] + }, + { + "name": "LS-6\u5236\u5bfc\u70b8\u5f39", + "attack_range": 5, + "damage": 15.0, + "max_ammo": 1, + "strike_types": [ + 1 + ] + }, + { + "name": "PL-15\u7a7a\u7a7a\u5bfc\u5f39", + "attack_range": 8, + "damage": 7.0, + "max_ammo": 4, + "strike_types": [ + 0 + ] + }, + { + "name": "YJ-91\u7a7a\u8230\u5bfc\u5f39", + "attack_range": 6, + "damage": 15.0, + "max_ammo": 2, + "strike_types": [ + 2 + ] + } + ] + }, + { + "agent_id": "7772", + "pos": { + "q": 4, + "r": 14 + }, + "max_endurance": 4.0, + "max_fuel": 120.0, + "type": 5, + "team_id": 2, + "faction_id": 1, + "move_type": 0, + "defense": 1.0, + "mobility": 6.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [ + { + "name": "AKD-21\u53cd\u5766\u514b\u5bfc\u5f39", + "attack_range": 6, + "damage": 8.0, + "max_ammo": 6, + "strike_types": [ + 1 + ] + }, + { + "name": "LS-6\u5236\u5bfc\u70b8\u5f39", + "attack_range": 5, + "damage": 15.0, + "max_ammo": 1, + "strike_types": [ + 1 + ] + }, + { + "name": "PL-15\u7a7a\u7a7a\u5bfc\u5f39", + "attack_range": 8, + "damage": 7.0, + "max_ammo": 4, + "strike_types": [ + 0 + ] + }, + { + "name": "YJ-91\u7a7a\u8230\u5bfc\u5f39", + "attack_range": 6, + "damage": 15.0, + "max_ammo": 2, + "strike_types": [ + 2 + ] + } + ] + }, + { + "agent_id": "6f9d", + "pos": { + "q": 5, + "r": 14 + }, + "max_endurance": 4.0, + "max_fuel": 120.0, + "type": 5, + "team_id": 3, + "faction_id": 2, + "move_type": 0, + "defense": 1.0, + "mobility": 6.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [ + { + "name": "AKD-21\u53cd\u5766\u514b\u5bfc\u5f39", + "attack_range": 6, + "damage": 8.0, + "max_ammo": 6, + "strike_types": [ + 1 + ] + }, + { + "name": "LS-6\u5236\u5bfc\u70b8\u5f39", + "attack_range": 5, + "damage": 15.0, + "max_ammo": 1, + "strike_types": [ + 1 + ] + }, + { + "name": "PL-15\u7a7a\u7a7a\u5bfc\u5f39", + "attack_range": 8, + "damage": 7.0, + "max_ammo": 4, + "strike_types": [ + 0 + ] + }, + { + "name": "YJ-91\u7a7a\u8230\u5bfc\u5f39", + "attack_range": 6, + "damage": 15.0, + "max_ammo": 2, + "strike_types": [ + 2 + ] + } + ] + }, + { + "agent_id": "2477", + "pos": { + "q": 6, + "r": 14 + }, + "max_endurance": 4.0, + "max_fuel": 120.0, + "type": 5, + "team_id": 3, + "faction_id": 2, + "move_type": 0, + "defense": 1.0, + "mobility": 6.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [ + { + "name": "AKD-21\u53cd\u5766\u514b\u5bfc\u5f39", + "attack_range": 6, + "damage": 8.0, + "max_ammo": 6, + "strike_types": [ + 1 + ] + }, + { + "name": "LS-6\u5236\u5bfc\u70b8\u5f39", + "attack_range": 5, + "damage": 15.0, + "max_ammo": 1, + "strike_types": [ + 1 + ] + }, + { + "name": "PL-15\u7a7a\u7a7a\u5bfc\u5f39", + "attack_range": 8, + "damage": 7.0, + "max_ammo": 4, + "strike_types": [ + 0 + ] + }, + { + "name": "YJ-91\u7a7a\u8230\u5bfc\u5f39", + "attack_range": 6, + "damage": 15.0, + "max_ammo": 2, + "strike_types": [ + 2 + ] + } + ] + }, + { + "agent_id": "9183", + "pos": { + "q": 7, + "r": 14 + }, + "max_endurance": 10.0, + "max_fuel": 120.0, + "type": 1, + "team_id": 3, + "faction_id": 2, + "move_type": 1, + "defense": 3.0, + "mobility": 4.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [] + }, + { + "agent_id": "45e5", + "pos": { + "q": 18, + "r": 14 + }, + "max_endurance": 10.0, + "max_fuel": 120.0, + "type": 1, + "team_id": 3, + "faction_id": 2, + "move_type": 1, + "defense": 3.0, + "mobility": 4.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [] + }, + { + "agent_id": "7e23", + "pos": { + "q": 19, + "r": 14 + }, + "max_endurance": 3.0, + "max_fuel": 80.0, + "type": 12, + "team_id": 4, + "faction_id": 3, + "move_type": 0, + "defense": 0.0, + "mobility": 5.0, + "info_level": 6, + "stealth_level": 0.0, + "switchable_weapons": [] + } +] \ No newline at end of file diff --git a/data/test/MapInfo.json b/data/test/MapInfo.json new file mode 100644 index 0000000..61f0f64 --- /dev/null +++ b/data/test/MapInfo.json @@ -0,0 +1,2802 @@ +[ + { + "pos": { + "q": 10, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 10, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 11, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 12, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 13, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 14, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 15, + "r": 0 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 16, + "r": 0 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 17, + "r": 0 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 18, + "r": 0 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 19, + "r": 0 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 20, + "r": 0 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 21, + "r": 0 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 22, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 23, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 24, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 25, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 26, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 27, + "r": 0 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 28, + "r": 0 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 9, + "r": 1 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 10, + "r": 1 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 11, + "r": 1 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 12, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 13, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 14, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 15, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 16, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 17, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 18, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 19, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 20, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 21, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 22, + "r": 1 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 23, + "r": 1 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 24, + "r": 1 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 25, + "r": 1 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 26, + "r": 1 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 27, + "r": 1 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 28, + "r": 1 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 8, + "r": 2 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 9, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 11, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 12, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 13, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 14, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 15, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 16, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 17, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 18, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 19, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 20, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 21, + "r": 2 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 22, + "r": 2 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 23, + "r": 2 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 24, + "r": 2 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 25, + "r": 2 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 26, + "r": 2 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 27, + "r": 2 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 8, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 11, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 12, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 13, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 14, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 15, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 16, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 17, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 18, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 19, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 20, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 21, + "r": 3 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 22, + "r": 3 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 23, + "r": 3 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 24, + "r": 3 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 25, + "r": 3 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 26, + "r": 3 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 27, + "r": 3 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 7, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 8, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 11, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 12, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 13, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 14, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 15, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 16, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 17, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 18, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 19, + "r": 4 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 20, + "r": 4 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 21, + "r": 4 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 22, + "r": 4 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 23, + "r": 4 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 24, + "r": 4 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 25, + "r": 4 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 26, + "r": 4 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 7, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 8, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 11, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 12, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 13, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 14, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 15, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 16, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 17, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 18, + "r": 5 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 19, + "r": 5 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 20, + "r": 5 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 21, + "r": 5 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 22, + "r": 5 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 23, + "r": 5 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 24, + "r": 5 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 25, + "r": 5 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 26, + "r": 5 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 6, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 7, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 8, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 11, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 12, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 13, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 14, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 15, + "r": 6 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 16, + "r": 6 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 17, + "r": 6 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 18, + "r": 6 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 19, + "r": 6 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 20, + "r": 6 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 21, + "r": 6 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 22, + "r": 6 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 23, + "r": 6 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 24, + "r": 6 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 25, + "r": 6 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 6, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 7, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 8, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 11, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 12, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 13, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 14, + "r": 7 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 15, + "r": 7 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 16, + "r": 7 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 17, + "r": 7 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 18, + "r": 7 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 19, + "r": 7 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 20, + "r": 7 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 21, + "r": 7 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 22, + "r": 7 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 23, + "r": 7 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 24, + "r": 7 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 25, + "r": 7 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 5, + "r": 8 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 6, + "r": 8 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 7, + "r": 8 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 8, + "r": 8 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 8 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 8 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 11, + "r": 8 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 12, + "r": 8 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 13, + "r": 8 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 14, + "r": 8 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 15, + "r": 8 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 16, + "r": 8 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 17, + "r": 8 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 18, + "r": 8 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 8 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 20, + "r": 8 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 21, + "r": 8 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 22, + "r": 8 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 23, + "r": 8 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 24, + "r": 8 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 5, + "r": 9 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 6, + "r": 9 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 7, + "r": 9 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 8, + "r": 9 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 9 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 9 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 11, + "r": 9 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 12, + "r": 9 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 13, + "r": 9 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 14, + "r": 9 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 15, + "r": 9 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 16, + "r": 9 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 17, + "r": 9 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 9 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 9 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 20, + "r": 9 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 21, + "r": 9 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 22, + "r": 9 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 23, + "r": 9 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 24, + "r": 9 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 4, + "r": 10 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 5, + "r": 10 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 6, + "r": 10 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 7, + "r": 10 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 8, + "r": 10 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 10 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 10 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 11, + "r": 10 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 12, + "r": 10 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 13, + "r": 10 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 14, + "r": 10 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 15, + "r": 10 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 10 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 17, + "r": 10 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 10 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 10 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 20, + "r": 10 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 21, + "r": 10 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 22, + "r": 10 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 23, + "r": 10 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 4, + "r": 11 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 5, + "r": 11 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 6, + "r": 11 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 7, + "r": 11 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 8, + "r": 11 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 9, + "r": 11 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 10, + "r": 11 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 11, + "r": 11 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 12, + "r": 11 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 13, + "r": 11 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 14, + "r": 11 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 11 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 11 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 17, + "r": 11 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 11 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 11 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 20, + "r": 11 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 21, + "r": 11 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 22, + "r": 11 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 23, + "r": 11 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 3, + "r": 12 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 4, + "r": 12 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 5, + "r": 12 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 6, + "r": 12 + }, + "terrain_type": 2 + }, + { + "pos": { + "q": 7, + "r": 12 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 8, + "r": 12 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 9, + "r": 12 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 10, + "r": 12 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 11, + "r": 12 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 12, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 13, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 14, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 17, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 20, + "r": 12 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 21, + "r": 12 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 22, + "r": 12 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 3, + "r": 13 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 4, + "r": 13 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 5, + "r": 13 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 6, + "r": 13 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 7, + "r": 13 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 8, + "r": 13 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 9, + "r": 13 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 10, + "r": 13 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 11, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 12, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 13, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 14, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 17, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 13 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 20, + "r": 13 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 21, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 22, + "r": 13 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 2, + "r": 14 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 3, + "r": 14 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 4, + "r": 14 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 5, + "r": 14 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 6, + "r": 14 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 7, + "r": 14 + }, + "terrain_type": 0 + }, + { + "pos": { + "q": 8, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 9, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 10, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 11, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 12, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 13, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 14, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 17, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 14 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 19, + "r": 14 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 20, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 21, + "r": 14 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 2, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 3, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 4, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 5, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 6, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 7, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 8, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 9, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 10, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 11, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 12, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 13, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 14, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 17, + "r": 15 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 18, + "r": 15 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 19, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 20, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 21, + "r": 15 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 1, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 2, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 3, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 4, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 5, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 6, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 7, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 8, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 9, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 10, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 11, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 12, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 13, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 14, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 16 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 17, + "r": 16 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 18, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 20, + "r": 16 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 1, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 2, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 3, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 4, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 5, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 6, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 7, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 8, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 9, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 10, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 11, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 12, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 13, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 14, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 17 + }, + "terrain_type": 1 + }, + { + "pos": { + "q": 17, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 20, + "r": 17 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 0, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 1, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 2, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 3, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 4, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 5, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 6, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 7, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 8, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 9, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 10, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 11, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 12, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 13, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 14, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 17, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 18 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 0, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 1, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 2, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 3, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 4, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 5, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 6, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 7, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 8, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 9, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 10, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 11, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 12, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 13, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 14, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 15, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 16, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 17, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 18, + "r": 19 + }, + "terrain_type": 3 + }, + { + "pos": { + "q": 19, + "r": 19 + }, + "terrain_type": 0 + } +] \ No newline at end of file diff --git a/tests/__pycache__/test_yes_cmdr_env.cpython-310.pyc b/tests/__pycache__/test_yes_cmdr_env.cpython-310.pyc new file mode 100644 index 0000000..1f44bc2 Binary files /dev/null and b/tests/__pycache__/test_yes_cmdr_env.cpython-310.pyc differ diff --git a/tests/__pycache__/test_yes_cmdr_env.cpython-311.pyc b/tests/__pycache__/test_yes_cmdr_env.cpython-311.pyc new file mode 100644 index 0000000..bece98b Binary files /dev/null and b/tests/__pycache__/test_yes_cmdr_env.cpython-311.pyc differ diff --git a/tests/__pycache__/test_yes_cmdr_env.cpython-39.pyc b/tests/__pycache__/test_yes_cmdr_env.cpython-39.pyc new file mode 100644 index 0000000..3728921 Binary files /dev/null and b/tests/__pycache__/test_yes_cmdr_env.cpython-39.pyc differ diff --git a/tests/test_yes_cmdr_env.py b/tests/test_yes_cmdr_env.py new file mode 100644 index 0000000..542b879 --- /dev/null +++ b/tests/test_yes_cmdr_env.py @@ -0,0 +1,89 @@ +import sys +sys.path.append("../") + +from yes_cmdr.yes_cmdr_env import YesCmdrEnv +from yes_cmdr.common.agent import Action, Command, ActionType + +def test_action_id_transform(env): + print("Testing action id transform") + env.reset() + for idx in range(env.action_space_size): + assert env.action_to_id(env.id_to_action(idx)) == idx, f"Expected {env.id_to_action(idx)} as {idx} but got {env.action_to_id(env.id_to_action(idx))}" + + env.step(0) + + for idx in range(env.action_space_size): + assert env.action_to_id(env.id_to_action(idx)) == idx, f"Expected {env.id_to_action(idx)} as {idx} but got {env.action_to_id(env.id_to_action(idx))}" + +def test_random_action(env): + print("Testing random action") + env.reset() + for i in range(100): + action = env.random_action() + assert env.check_validity(action)[0], f"Action {action} is not valid" + env.step(action) + +def test_bot_action(env): + print("Testing bot action") + env.reset() + for i in range(100): + action = env.bot_action() + assert env.check_validity(action)[0], f"Action {action} is not valid" + env.step(action) + +def test_command(env): + env.reset() + + done = False + step_count = 1 + + for idx in range(env.action_space_size): + assert env.action_to_id(env.id_to_action(idx)) == idx, f"Expected {env.id_to_action(idx)} as {idx} but got {env.action_to_id(env.id_to_action(idx))}" + + agents_data = env.teams[0].agents + agents = agents_data.values() + + for agent in agents: + print(agent.to_dict()) + + env.make_plan(Command(agent_id="9f19", target_id="95f3", attack_count=1, action_type=ActionType.ATTACK, start_time=1)) + env.make_plan(Command(agent_id="9f19", target_id="2477", attack_count=1, action_type=ActionType.ATTACK, start_time=5)) + for idx, action in enumerate(agents_data["9f19"].todo): + print(action) + + env.make_plan(Command(agent_id="cc4c", target_id="c53f", attack_count=4, action_type=ActionType.ATTACK)) + for idx, action in enumerate(agents_data["cc4c"].todo): + print(action) + + while not done and step_count <= 20: + for agent in agents: + if agent.todo and agent.todo[0].start_time <= step_count: + action = env.todo_action(agent.agent_id) + print(action) + obs, reward, terminated, truncated, info = env.step(action) + print(f"Step {step_count}: Action {action.action_type.name} -> Reward: {reward}, Done: {done}") + print(f"Info: {info}") + if "exception" in info: + agent.todo.clear() + + env.step(0) + env.step(0) + + step_count += 1 + + env.close() + +if __name__ == "__main__": + # 配置环境参数 + env = YesCmdrEnv( + data_path="./data/test", + max_team=5, + max_faction=4, + war_fog=False + ) + + test_action_id_transform(env) + test_random_action(env) + test_bot_action(env) + # test_command(env) + diff --git a/yes_cmdr_env.py b/yes_cmdr_env.py new file mode 100644 index 0000000..3c8d1d4 --- /dev/null +++ b/yes_cmdr_env.py @@ -0,0 +1,1267 @@ +import numpy as np +import json +import os +from datetime import datetime +from typing import List, Optional +import gymnasium as gym +from .common.utils import * +from .common.agent import * + +class YesCmdrEnv(gym.Env): + metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4} + + def __init__(self, + use_real_engine: bool = False, + replay_path: str = None, + campaign_id: str = "NOID", + team_id: int = 0, + max_team: int = 2, + max_faction: int = 2, + data_path: str = "../data", + max_episode_steps: int = 1000, + bot_version: str = "v0", + war_fog: bool = True) -> None: + self._init_flag = False + # set other properties... + self.use_real_engine = use_real_engine + self.replay_path = replay_path + self.campaign_id = campaign_id + self.init_team_id = team_id + self.max_team = max_team + self.max_faction = max_faction + self.data_path = data_path + self.max_episode_steps = max_episode_steps + self.bot_version = bot_version + self.war_fog = war_fog + + def reset(self): + # reset the environment... + # 实际的环境初始化是在第一次调用 reset 方法时进行的 + if not self._init_flag: + self._init_flag = True + self._make_env() + if self.replay_path is not None: + from .common.renderer import Renderer + self._renderer = Renderer(self.map.nodes.values()) + else: + for node in self.map.nodes.values(): + node.reset() + for team in self.teams: + for agent_id, agent in team.agents.items(): + agent.reset() + if agent.pos == (-1, -1): # Illegal position for padding + continue + assert agent.pos in self.map.nodes, f"Invalid agent position: {agent.pos}" + self.map.nodes[agent.pos].agent_id = agent_id + self.map.nodes[agent.pos].team_id = team.team_id + if self.replay_path is not None: + self._frames = [[], []] + self._legal_actions = None + self._legal_action_ids = None + self._spotted_enemy_ids = None + self._team_id = self.init_team_id + self._spotted_agents = None + self.episode_steps = 0 + self._cumulative_rewards = np.zeros(2) + + obs = self.observe() + info = { + "next_team": self.team_id + 1 + } + + return obs, info + + def step(self, action: Union[int, Action]): + if isinstance(action, int): + if action not in self.legal_action_ids: + print(f"Invalid action will be ignored: {action}") + info = {} + info["accum_reward"] = (self._cumulative_rewards[0], self._cumulative_rewards[1]) + info["next_team"] = self.team_id + 1 + info["exception"] = f"Invalid action {action}" + return self.observe(), 0, False, False, info + action = self.id_to_action(action) + elif isinstance(action, Action): + valid, reason = self.check_validity(action) + if not valid: + if action.action_type == ActionType.MOVE and reason.find("occupied") > 0: + print("Trying to move to an adjacent position that is empty.") + dis = float('inf') + for _action in self.legal_actions: + if _action.agent_id == action.agent_id and _action.action_type == ActionType.MOVE and get_axial_dis(_action.des, action.des) < dis: + dis = get_axial_dis(_action.des, action.des) + action = _action + else: + print(f"Invalid action will be ignored: {action}") + print(f"Reason: {reason}") + info = {} + info["accum_reward"] = (self._cumulative_rewards[0], self._cumulative_rewards[1]) + info["next_team"] = self.team_id + 1 + info["exception"] = f"Invalid action: {action}. Reason: {reason}" + return self.observe(), 0, False, False, info + + assert(isinstance(action, Action)) + + reward, terminated = self._player_step(action) + + """ + NOTE: Clear the states before observation + """ + self._legal_actions = None + self._legal_action_ids = None + self._spotted_enemy_ids = None + + obs = self.observe() + truncated = self.episode_steps >= self.max_episode_steps + self._cumulative_rewards[self.team_id] += reward + + info = {} + info["accum_reward"] = (self._cumulative_rewards[0], self._cumulative_rewards[1]) + info["next_team"] = self.team_id + 1 if not terminated else -1 + + if self.replay_path is not None: + self._frames[self.team_id].append(self.render(mode="rgb_array")) + + if terminated or truncated: + # The eval_episode_return is calculated from Player 1's perspective + info["eval_episode_return"] = reward if self.team_id == 0 else -reward + info["done_reason"] = "Terminated" if terminated else "Truncated" + + if self.replay_path is not None: + self.save_replay() + + return obs, reward, terminated, truncated, info + + def render(self, mode="human", attack_agent=None, defend_agent=None): + if mode == "rgb_array": + player_agents = self.teams[self.team_id].agents.values() + return self._renderer.render(self.current_agents, self.spotted_enemy_agents, attack_agent, defend_agent) + elif mode == "human": + return self.observe() + else: + raise ValueError("Invalid render mode: {}".format(mode)) + + def action_to_id(self, action: Action) -> int: + if action.action_type == ActionType.END_OF_TURN: + return 0 + action_space = self.action_space + if action.agent_id not in action_space.keys(): + raise ValueError(f"Invalid agent_id: {action.agent_id[:8]}") + agent_action_space = action_space[action.agent_id] + if action.action_type not in agent_action_space.keys(): + agent = self.get_agent(action.agent_id) + print(agent.available_types, agent.parked_agents) + print(action) + raise ValueError(f"Invalid action_type: {action.action_type.name} for agent {action.agent_id[:8]}. Available action_types: {agent_action_space.keys()}") + action_id = 1 + for agent_id, agent_action_space in action_space.items(): + for action_type, action_space in agent_action_space.items(): + if agent_id != action.agent_id or action_type != action.action_type: + action_id += action_space.n + else: + if action.action_type == ActionType.SWITCH_WEAPON: # 以下标编码,不以位置编码 + assert action.weapon_idx < action_space.n, f"Invalid weapon_idx: {action.weapon_idx}, action_space.n: {action_space.n}" + action_id += action.weapon_idx + elif action.action_type == ActionType.RELEASE: + assert action.target_id < action_space.n, f"Invalid target_idx: {action.target_id}, action_space.n: {action_space.n}" + action_id += action.target_id # 此处为下标 + else: + pos, des = self.get_agent(action.agent_id).pos, action.des + encode_id = encode_axial((des[0] - pos[0], des[1] - pos[1])) + action_id += encode_id - 1 + if action.action_type == ActionType.MOVE: + cur_range = self.get_agent(action.agent_id).mobility + elif action.action_type == ActionType.ATTACK: + cur_range = self.get_agent(action.agent_id).attack_range + elif action.action_type == ActionType.INTERACT: + cur_range = 1 + assert encode_id - 1 < action_space.n, f"Action {action.action_type.name} Distance: {get_axial_dis(pos, des)}, cur_range: {cur_range} encode_id: {encode_id} action_space.n: {action_space.n}\n{action}" + return int(action_id) + assert False, "Should not reach here" + + def id_to_action(self, action_id: int) -> Action: + if action_id == 0: + return Action(ActionType.END_OF_TURN) + action_id -= 1 + action_space = self.action_space + for agent_id, agent_action_space in action_space.items(): + for action_type, action_space in agent_action_space.items(): + if action_id < action_space.n: + if action_type == ActionType.SWITCH_WEAPON: + return Action(agent_id=agent_id, action_type=action_type, weapon_idx=action_id, weapon_name=self.get_agent(agent_id).weapon.name) + elif action_type == ActionType.RELEASE: + return Action(agent_id=agent_id, target_id=action_id, action_type=action_type) + else: + pos = self.get_agent(agent_id).pos + delta = decode_axial(action_id + 1) # 编码 0 是原位置 + des = (pos[0] + delta[0], pos[1] + delta[1]) + return Action(agent_id=agent_id, des=des, action_type=action_type) + else: + action_id -= action_space.n + raise ValueError("Invalid action_id: {}".format(action_id)) + + def get_agent(self, agent_id: str) -> Agent: + for team in self.teams: + if agent_id in team.agents.keys(): + agent = team.agents[agent_id] + # if not agent.alive: + # raise ValueError(f"Agent {agent_id} is not alive") + return agent + raise ValueError(f"Invalid agent id: {agent_id}") + + def get_node(self, pos: Tuple[int, int]) -> TileNode: + if pos not in self.map.nodes: + raise ValueError(f"Invalid position: {pos}") + return self.map.nodes[pos] + + @property + def legal_actions(self) -> List[Action]: + if self._legal_actions is not None: + return self._legal_actions + + map_data = self.map.nodes + player_data = self.current_agents_dict + spotted_enemy_ids = self.spotted_enemy_ids + + _legal_actions = [Action(ActionType.END_OF_TURN)] # Default action + + for agent_id, agent in player_data.items(): + if agent.commenced_action or not agent.alive or agent.pos == (-1, -1) or agent.is_carried: + continue + pos = agent.pos + # 移动 + mobility = min(agent.fuel, agent.mobility) + if mobility > 0: + adj_pos = get_adj_pos(pos, int(mobility)) + aval_data = { + pos: node + for pos in adj_pos if pos in map_data and + ( + (node := map_data[pos]).team_id != self.enemy_id + or + node.agent_id not in spotted_enemy_ids + ) + } + aval_nodes = astar_search(aval_data, agent.move_type, start=pos, limit=mobility) + for des in aval_nodes: + if des == pos: + continue + node = map_data[des] + if node.team_id != self.team_id and node.agent_id not in spotted_enemy_ids: + # 一个格子只能有一个单位 + _legal_actions.append(Action( + agent_id=agent_id, + des=des, + action_type=ActionType.MOVE + )) + elif node.team_id == self.team_id and agent.agent_type in (target := self.get_agent(node.agent_id)).available_types \ + and len(target.parked_agents) < target.capacity: + # 除非终点有可以停靠的单位 + _legal_actions.append(Action( + agent_id=agent_id, + target_id=node.agent_id, + des=des, + action_type=ActionType.MOVE + )) + # 进攻 + if agent.attack_range > 0 and agent.ammo > 0: + for des in get_adj_pos(pos, agent.attack_range): + if des not in map_data: + continue + node = map_data[des] + if node.agent_id in spotted_enemy_ids and self.get_agent(node.agent_id).move_type in agent.strike_types: + # 已发现敌军 + _legal_actions.append(Action( + agent_id=agent_id, + target_id=node.agent_id, + action_type=ActionType.ATTACK, + des=des + )) + # 交互 + for des in get_adj_pos(pos, 1): + if des not in map_data: + continue + node = map_data[des] + if node.team_id == self.team_id and agent.agent_type in (target :=self.get_agent(node.agent_id)).available_types \ + and len(target.parked_agents) < target.capacity: + _legal_actions.append(Action( + agent_id=agent_id, + target_id=node.agent_id, + des=des, + action_type=ActionType.INTERACT + )) + # 释放 + if agent.parked_agents: + for des in get_adj_pos(pos, 1): + if des not in map_data: + continue + node = map_data[des] + if node.team_id != -1: + continue + for idx, agent_to_release in enumerate(agent.parked_agents): + if get_cost(agent_to_release.move_type, node.terrain_type) != 0: + _legal_actions.append(Action( + agent_id=agent_id, + target_id=idx, + des=des, + action_type=ActionType.RELEASE + )) + break + # 切换武器 + if agent.switchable_weapons: + for des in get_adj_pos(pos, 1): + if des not in map_data: + continue + node = map_data[des] + if node.team_id == self.team_id and self.get_agent(node.agent_id).has_supply: + for weapon_idx, weapon in enumerate(agent.switchable_weapons): + if not agent.weapon or weapon.name != agent.weapon.name: + _legal_actions.append(Action( + agent_id=agent_id, + target_id=weapon.name, + weapon_idx=weapon_idx, + action_type=ActionType.SWITCH_WEAPON + )) + + + self._legal_actions = _legal_actions + return self._legal_actions + + @property + def legal_action_ids(self) -> List[int]: + if self._legal_action_ids is not None: + return self._legal_action_ids + self._legal_action_ids = [self.action_to_id(action) for action in self.legal_actions] + return self._legal_action_ids + + @property + def spotted_enemy_ids(self) -> List[str]: + if not self.war_fog: + return self.teams[self.enemy_id].alive_ids + + if self._spotted_enemy_ids is not None: + return self._spotted_enemy_ids + + player_agents = self.current_agents + enemy_data = self.enemy_agents_dict + map_data = self.map.nodes + _spotted_enemy_ids = [] + + for agent in player_agents: + info_level = agent.info_level + for des in get_adj_pos(agent.pos, agent.info_level): + if des not in map_data: + continue + node = map_data[des] + dis = get_axial_dis(agent.pos, des) + if dis == 1: # Adjacent agents are always spotted + dis = -1e9 + if node.team_id == self.enemy_id \ + and enemy_data[node.agent_id].stealth_level < info_level - dis + 1: + _spotted_enemy_ids.append(node.agent_id) + + self._spotted_enemy_ids = _spotted_enemy_ids + return self._spotted_enemy_ids + + def save_replay(self): + if not os.path.exists(self.replay_path): + os.makedirs(self.replay_path) + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + for team_id in range(self.max_team): + path = os.path.join( + self.replay_path, + f"tianqiong_{timestamp}_Team{team_id + 1}.mp4" + ) + self.display_frames_as_mp4(self._frames[team_id], path) + print(f'replay {path} saved!') + + @staticmethod + def display_frames_as_mp4(frames: list, path: str, fps=4) -> None: + assert path.endswith('.mp4'), f'path must end with .mp4, but got {path}' + import imageio + imageio.mimwrite(path, frames, fps=fps) + + def _make_env(self): + # Configuration for file paths + map_path = f"{self.data_path}/MapInfo.json" + agents_path = f"{self.data_path}/AgentsInfo.json" + + # Load map and agents information + with open(map_path, 'r') as f: + origin_map = json.load(f) + with open(agents_path, 'r') as f: + origin_agents = json.load(f) + + _nodes = { + (node := dict_to_node(node_dict)).pos: node for node_dict in origin_map + } + agents_dict_lists = [ + [agent_dict for agent_dict in origin_agents if agent_dict["team_id"] == team_id] + for team_id in range(self.max_team) + ] + _agents = [ + { + (agent := dict_to_agent(agent_dict)).agent_id: agent + for agent_dict in agents_dict_lists[team_id] + } + for team_id in range(self.max_team) + ] + _teams = [ + Team( + team_id=_team_id, + faction_id=_agents[_team_id][0].faction_id, + agents=_agents[_team_id] + ) + for _team_id in range(self.max_team) + ] + + for team in _teams: + for agent_id, agent in team.agents.items(): + if agent.pos == (-1, -1): # Illegal position for padding + continue + assert agent.pos in _nodes, f"Invalid agent position: {agent.pos}" + _nodes[agent.pos].agent_id = agent_id + _nodes[agent.pos].team_id = team.team_id + + _map = Map(_nodes) + self.map = _map + self.teams = _teams + self.obs_shape = _map.width, _map.height + + # Get action space + self._action_spaces = [ + gym.spaces.Dict( + { + f"team_{team.team_id + 1}": gym.spaces.Dict( + { + ActionType.END_OF_TURN: gym.spaces.Discrete(1) # Default action + } + ), + **{ + agent_id: agent.action_space for agent_id, agent in team.agents.items() + } + } + ) + for team in self.teams + ] + + self._action_space_sizes = [ + sum(action_space.n for agent_action_space in self._action_spaces[team_id].values() for action_space in agent_action_space.values()) + for team_id in range(self.max_team) + ] + + self._team_id = 0 + for action_space_size in self._action_space_sizes: + for action_id in range(action_space_size): + assert self.action_to_id(self.id_to_action(action_id)) == action_id + self._team_id += 1 + + # Get observation space + self._observation_spaces = [ + gym.spaces.Dict( + { + "action_mask": gym.spaces.Box(0, 1, (self._action_space_sizes[team_id], ), dtype=np.int8), # Different size for each team + "union_endurance": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.float32), + "union_info_level": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.int16), + "union_stealth_level": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.int16), + "union_mobility": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.float32), + "union_defense": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.int16), + "union_damage": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.float32), + "union_fuel": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.float32), + "union_ammo": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.int16), + "enemy_endurance": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.float32), + "enemy_info_level": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.int16), + "enemy_stealth_level": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.int16), + "enemy_mobility": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.float32), + "enemy_defense": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.int16), + "enemy_damage": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.float32), + "enemy_fuel": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.float32), + "enemy_ammo": gym.spaces.Box(0, 1000, self.obs_shape, dtype=np.int16) + } + ) + for team_id in range(self.max_team) + ] + + self._reward_space = gym.spaces.Box(low=-1000, high=1000, shape=(1, ), dtype=np.float32) + + # 预处理己方联盟和所有敌方的单位信息 + self._union_agents = [[] for _ in range(self.max_faction)] + self._union_agents_dict = [{} for _ in range(self.max_faction)] + self._enemy_agents = [[] for _ in range(self.max_faction)] + self._enemy_agents_dict = [{} for _ in range(self.max_faction)] + for team_id in range(self.max_team): + faction_id = self.teams[team_id].faction_id + agents_list = list(self.teams[team_id].agents.values()) + agents_dict = self.teams[team_id].agents + self._union_agents[faction_id].extend(agents_list) + for other_faction_id in range(self.max_faction): + if other_faction_id != faction_id: + self._enemy_agents[other_faction_id].extend(agents_list) + self._enemy_agents_dict[other_faction_id] |= agents_dict + + + @property + def team_id(self): + return self._team_id + + @property + def current_agents(self): + return self.teams[self._team_id].agents.values() + + @property + def current_agents_dict(self): + return self.teams[self._team_id].agents + + @property + def enemy_agents(self): + return self._enemy_agents[self.faction_id] + # if self._enemy_agents is not None: + # return self._enemy_agents + # _enemy_agents = [] + # for team_id in range(self.max_team): + # if self.teams[team_id].faction_id != self.faction_id: + # _enemy_agents.extend(list(self.teams[team_id].agents.values())) + # self._enemy_agents = _enemy_agents + # return self._enemy_agents + + @property + def enemy_agents_dict(self): + return self._enemy_agents_dict[self.faction_id] + # if self._enemy_agents_dict is not None: + # return self._enemy_agents_dict + # _enemy_agents_dict = {} + # for team_id in range(self.max_team): + # if self.teams[team_id].faction_id != self.faction_id: + # for agent_id, agent in self.teams[team_id].agents.items(): + # _enemy_agents_dict[agent_id] = agent + # self._enemy_agents_dict = _enemy_agents_dicct + # return self._enemy_agents_dict + + @property + def spotted_enemy_agents(self): + if self._spotted_enemy_agents is not None: + return self._spotted_enemy_agents + _spotted_enemy_agents = [self.enemy_agents_dict[enemy_id] for enemy_id in self.spotted_enemy_ids] + self._spotted_enemy_agents = _spotted_enemy_agents + + @property + def faction_id(self): + return self.teams[self._team_id].faction_id + + @property + def observation_space(self): + return self._observation_spaces[self.team_id] + + @property + def action_space(self): + return self._action_spaces[self.team_id] + + @property + def action_space_size(self): + return self._action_space_sizes[self.team_id] + + @property + def reward_space(self): + return self._reward_space + + def random_action(self) -> Action: + return np.random.choice(self.legal_actions) + + def next_turn(self): + self._team_id = (self._team_id + 1) % self.max_team + + def bot_action(self) -> Action: + if self.bot_version == "v0": + attack_actions = [action for action in self.legal_actions if action.action_type == ActionType.ATTACK] + if attack_actions: + return np.random.choice(attack_actions) + else: + return self.random_action() + else: + raise NotImplementedError(f"Invalid bot version: {self.bot_version}") + + def _player_step(self, action: Action): + if action.action_type == ActionType.END_OF_TURN: + """ + NOTE: here exchange the player + """ + # 清除动作标记 + for agent in self.current_agents: + agent.commenced_action = False + # 切换玩家 + self.episode_steps += 1 + if not self.use_real_engine: + self.next_turn() + reward, terminated = 0, False # 这里可以加上惩罚项 + elif action.action_type == ActionType.MOVE: + reward = self._move(action.agent_id, action.des, action.target_id) + terminated = False + elif action.action_type == ActionType.ATTACK: + reward, terminated = self._attack(action.agent_id, action.target_id) + elif action.action_type == ActionType.INTERACT: + reward = self._interact(action.agent_id, action.target_id) + terminated = False + elif action.action_type == ActionType.RELEASE: + reward = self._release(action.agent_id, action.target_id, action.des) + terminated = False + elif action.action_type == ActionType.SWITCH_WEAPON: + reward = self._switch_weapon(action.agent_id, action.weapon_idx) + terminated = False + else: + raise ValueError(f"Invalid action type: {action.action_type}") + + for agent in self.current_agents: + for pos in get_adj_pos(agent.pos, 1): + if pos in self.map.nodes and self.map.nodes[pos].team_id == self.team_id: + adj_agent = self.get_agent(self.map.nodes[pos].agent_id) + if not adj_agent.has_supply: + continue + module = adj_agent.supply + if module.add_endurance > 0: + agent.endurance += min(module.add_endurance, agent.max_endurance - agent.endurance) + else: + agent.endurance = agent.max_endurance + if agent.weapon is not None: + if module.add_ammo > 0: + agent.weapon.ammo += min(module.add_ammo, agent.weapon.max_ammo - agent.weapon.ammo) + else: + agent.weapon.ammo = agent.weapon.max_ammo + if module.add_fuel > 0: + agent.fuel += min(module.add_fuel, agent.max_fuel - agent.fuel) + else: + agent.fuel = agent.max_fuel + + return reward, terminated + + def _move(self, agent_id, des, target_id): + map_data = self.map.nodes + agent = self.current_agents_dict[agent_id] + spotted_enemy_ids = self.spotted_enemy_ids + origin_pos = agent.pos + # 清除原位置关联 + node = map_data[agent.pos] + node.agent_id = None + node.team_id = -1 + + mobility = min(agent.mobility, agent.fuel) + adj_pos = get_adj_pos(agent.pos, int(mobility)) + aval_data = { + pos: node + for pos in adj_pos if pos in map_data and + ( + (node := map_data[pos]).team_id != self.enemy_id + or + node.agent_id not in spotted_enemy_ids + ) + } + path = astar_search(aval_data, move_type=agent.move_type, start=agent.pos, goal=des, limit=mobility) + path = [agent.pos] + path + + exception = False + + for cur, nxt in zip(path[:-1], path[1:]): + if self.war_fog and map_data[nxt].team_id == self.enemy_id: # 前进方向遇到敌军,停下 + exception = True + break + if self.replay_path is not None: + self._frames[self.team_id].append(self.render(mode="rgb_array")) + agent.fuel -= get_cost(agent.move_type, map_data[nxt].terrain_type) + agent.pos = nxt + + # 更新位置以及与地图的关联 + cur = agent.pos + agent.commenced_action = True + + if map_data[cur].team_id == -1: + map_data[cur].agent_id = agent_id + map_data[cur].team_id = self.team_id + elif map_data[cur].agent_id == target_id: + carry_agent = self.get_agent(target_id) + assert agent.team_id == carry_agent.team_id + assert agent.agent_type in carry_agent.available_types, f"{agent} not in {carry_agent.available_types} (id: {carry_agent.agent_id[:8]}). " + assert len(carry_agent.parked_agents) < carry_agent.capacity, f"Agent {carry_agent[:8]} is available but full" + carry_agent.parked_agents.append(agent) + agent.is_carried = True + else: + assert exception + + return 0 + + def _attack(self, attack_id, target_id): + agent = self.current_agents[attack_id] + target_agent = self.teams[self.enemy_id].agents[target_id] + + if self.replay_path is not None: + self._frames[self.team_id].extend([self.render(mode="rgb_array", attack_agent=agent, defend_agent=target_agent)] * 4) + + damage = max(agent.damage - target_agent.defense, 0) + damage = min(damage, target_agent.endurance) + target_agent.endurance -= damage + agent.weapon.ammo -= 1 + agent.commenced_action = True + + if target_agent.endurance <= 0: + # 删除与地图关联 + node = self.get_node(target_agent.pos) + node.agent_id = None + node.team_id = -1 + + if self.teams[self.enemy_id].alive_count == 0: + return damage, True + + return damage, False + + def _interact(self, agent_id, target_id): + agent = self.get_agent(agent_id) + target = self.get_agent(target_id) + target.parked_agents.append(agent) + agent.is_carried = True + return 0 # 后续可以考虑修改奖励为补给的线性组合 + + def _release(self, agent_id: str, target_idx: int, des: Tuple[int, int]): + agent = self.get_agent(agent_id) + agent_to_release = agent.parked_agents[target_idx] + agent_to_release.pos = des + node = self.get_node(des) + node.team_id = agent_to_release.team_id + node.agent_id = agent_to_release.agent_id + + for module in agent.modules: + if module.add_endurance > 0: + agent_to_release.endurance += min(module.add_endurance, agent_to_release.max_endurance - agent_to_release.endurance) + else: + agent_to_release.endurance = agent_to_release.max_endurance + if agent_to_release.weapon is not None: + if module.add_ammo > 0: + agent_to_release.weapon.ammo += min(module.add_ammo, agent_to_release.weapon.max_ammo - agent_to_release.weapon.ammo) + else: + agent_to_release.weapon.ammo = agent_to_release.weapon.max_ammo + if module.add_fuel > 0: + agent_to_release.fuel += min(module.add_fuel, agent_to_release.max_fuel - agent_to_release.fuel) + else: + agent_to_release.fuel = agent_to_release.max_fuel + agent_to_release.is_carried = False + + return 0 # 后续可以考虑修改奖励为补给的线性组合 + + def _switch_weapon(self, agent_id: str, weapon_idx: int): + agent = self.get_agent(agent_id) + agent.weapon.reset() + agent.weapon = agent.switchable_weapons[weapon_idx] + return 0 + + def observe(self): + player_agents = self.current_agents + spotted_enemy_agents = self.spotted_enemy_agents + + obs = {} + + for desc, space in self.observation_space.items(): + obs[desc] = np.zeros(space.shape, dtype=space.dtype) + + legal_actions_ids = self.legal_action_ids + + for action_id in legal_actions_ids: + obs["action_mask"][action_id] = 1 + + for agent in player_agents: + pos = agent.pos + obs["player_info_level"][pos[0], pos[1]] = agent.info_level + obs["player_stealth_level"][pos[0], pos[1]] = agent.stealth_level + obs["player_mobility"][pos[0], pos[1]] = agent.mobility + obs["player_defense"][pos[0], pos[1]] = agent.defense + obs["player_damage"][pos[0], pos[1]] = agent.damage + obs["player_fuel"][pos[0], pos[1]] = agent.fuel + obs["player_ammo"][pos[0], pos[1]] = agent.ammo + obs["player_endurance"][pos[0], pos[1]] = agent.endurance + + for agent in spotted_enemy_agents: + pos = agent.pos + obs["enemy_info_level"][pos[0], pos[1]] = agent.info_level + obs["enemy_stealth_level"][pos[0], pos[1]] = agent.stealth_level + obs["enemy_mobility"][pos[0], pos[1]] = agent.mobility + obs["enemy_defense"][pos[0], pos[1]] = agent.defense + obs["enemy_damage"][pos[0], pos[1]] = agent.damage + obs["enemy_fuel"][pos[0], pos[1]] = agent.fuel + obs["enemy_ammo"][pos[0], pos[1]] = agent.ammo + obs["enemy_endurance"][pos[0], pos[1]] = agent.endurance + + return obs + + def command_move(self, agent_id, des: Union[Tuple[int, int], List[Tuple[int, int]]], start_time=0, subtask=False) -> Command: + agent = self.get_agent(agent_id) + state = agent.todo[-1].state if agent.todo else agent.state + if state.pos == des or state.pos in des: + return + + map_data = self.map.nodes + origin_todo_length = len(agent.todo) # 记录原 todo 的长度 + spotted_enemy_ids = self.spotted_enemy_ids + adj_pos = get_adj_pos(state.pos, int(state.fuel)) + adj_pos.append(state.pos) + aval_data = { + pos: node + for pos in adj_pos if pos in map_data and + ( + (node := map_data[pos]).team_id != self.enemy_id + or + node.agent_id not in spotted_enemy_ids + ) + } + path = get_path(agent=agent, map_data=aval_data, start=state.pos, des=des, limit=state.fuel) # 可能没有可行路径 + + for pos in path: + state.fuel -= get_cost(agent.move_type, map_data[pos].terrain_type) + state.pos = pos + agent.todo.append(Action(agent_id=agent_id, action_type=ActionType.MOVE, des=pos, start_time=start_time, state=state)) + + command = Command(agent_id=agent_id, + action_type=ActionType.MOVE, + des=des, + start_time=start_time, + end_time=start_time+len(agent.todo)-origin_todo_length-1, + state=state) + + if not subtask: + end_time = command.end_time + for i in range(origin_todo_length, len(agent.todo)): + agent.todo[i].start_time = start_time + i - origin_todo_length + agent.todo[i].end_time = end_time + agent.todo[-1].end_type = ActionType.MOVE + agent.cmd_todo.append(command) + + return command + + def command_attack(self, attack_id, target_id, attack_count=1, start_time=0, subtask=False) -> Command: + agent = self.get_agent(attack_id) + target_agent = self.get_agent(target_id) + + origin_todo_length = len(agent.todo) # 记录原 todo 的长度 + + state = agent.todo[-1].state if agent.todo else agent.state + + if target_agent.move_type not in state.weapon.strike_types: + flag = False + no_enough_ammo = False + for weapon_idx, weapon in enumerate(agent.switchable_weapons): + if weapon.strike_types and target_agent.move_type in weapon.strike_types: + if weapon.max_ammo < attack_count: + no_enough_ammo = True + try: + self.command_switch(attack_id, weapon_idx, start_time=start_time, subtask=True) + flag = True + break + except Exception as e: + agent.todo = agent.todo[:origin_todo_length] + raise ValueError("Failed switching to the proper weapon") + if not flag: + if no_enough_ammo: + agent.todo = agent.todo[:origin_todo_length] + raise ValueError("Agent has the striking weapon but no one with enough ammo") + else: + agent.todo = agent.todo[:origin_todo_length] + raise ValueError("No proper weapon found") + + state = agent.todo[-1].state if agent.todo else agent.state + + if state.weapon.ammo < attack_count: + if state.weapon.max_ammo >= attack_count: # 不需要换武器,尝试补给 + try: + self.command_supply(attack_id, start_time=start_time, subtask=True) + except Exception as e: + agent.todo = agent.todo[:origin_todo_length] + raise ValueError("No enough ammo and failed to get supplies") + else: # 需要换武器 + flag = False + for weapon_idx, weapon in enumerate(agent.switchable_weapons): + if weapon.strike_types and target_agent.move_type in weapon.strike_types and weapon.max_ammo >= attack_count: + try: + self.command_switch(attack_id, weapon_idx, start_time=start_time, subtask=True) + flag = True + break + except Exception as e: + agent.todo = agent.todo[:origin_todo_length] + raise ValueError("Failed switching to the proper weapon") + if not flag: + agent.todo = agent.todo[:origin_todo_length] + raise ValueError("No proper weapon has enough ammo") + + map_data = self.map.nodes + spotted_enemy_ids = self.spotted_enemy_ids + possible_des = [ + pos for pos in get_adj_pos(target_agent.pos, int(agent.attack_range)) + if pos in map_data and + ( + (node := map_data[pos]).team_id != self.enemy_id + or + node.agent_id not in spotted_enemy_ids + ) + ] + + try: + self.command_move(attack_id, possible_des, start_time=start_time, subtask=True) + except Exception as e: + agent.todo = agent.todo[:origin_todo_length] + raise ValueError("Failed to move to the target") + + state = agent.todo[-1].state if agent.todo else agent.state + + for _ in range(attack_count): + state.weapon.ammo -= 1 + agent.todo.append(Action(agent_id=attack_id, + action_type=ActionType.ATTACK, + target_id=target_id, + des=target_agent.pos, + start_time=start_time, + state=state)) + + command = Command(agent_id=attack_id, + action_type=ActionType.ATTACK, + target_id=target_id, + des=target_agent.pos, + attack_count=attack_count, + start_time=start_time, + end_time=start_time+len(agent.todo)-origin_todo_length-1, + state=state) + + if not subtask: + end_time = command.end_time + for i in range(origin_todo_length, len(agent.todo)): + agent.todo[i].start_time = start_time + i - origin_todo_length + agent.todo[i].end_time = end_time + agent.todo[-1].end_type = ActionType.ATTACK + agent.cmd_todo.append(command) + + return command + + def command_supply(self, agent_id, start_time=0, subtask=False) -> Command: + """ + 需要补给的单位移动至有补给模块的单位附近 + """ + agent = self.get_agent(agent_id) + player_agents = self.teams[agent.team_id].agents.values() + map_data = self.map.nodes + origin_todo_length = len(agent.todo) # 记录原 todo 的长度 + + possible_des = [] + + for _agent in player_agents: + if _agent.has_supply: + for pos in get_adj_pos(_agent.pos, 1): + if pos in map_data: + possible_des.append(pos) + + # print(possible_des) + + self.command_move(agent_id, possible_des, start_time, subtask=True) + + des = agent.todo[-1].des if agent.todo else agent.pos + for pos in get_adj_pos(des, 1): + if pos in map_data and map_data[pos].team_id == agent.team_id and self.get_agent(map_data[pos].agent_id).has_supply: + supply_id = map_data[pos].agent_id + supply_module = self.get_agent(supply_id).supply + + state = agent.todo[-1].state if agent.todo else agent.state + + if supply_module.add_endurance > 0: + state.endurance = min(agent.max_endurance, state.endurance + supply_module.add_endurance) + else: + state.endurance = agent.max_endurance + if supply_module.add_ammo > 0: + state.weapon.ammo = min(state.weapon.max_ammo, state.weapon.ammo + supply_module.add_ammo) + else: + state.weapon.ammo = state.weapon.max_ammo + if supply_module.add_fuel > 0: + state.fuel = min(agent.max_fuel, state.fuel + supply_module.add_fuel) + else: + state.fuel = agent.max_fuel + + command = Command(agent_id=agent_id, + action_type=ActionType.SUPPLY, + des=des, + start_time=start_time, + end_time=start_time+len(agent.todo)-origin_todo_length-1, + state=state) + + if not subtask: + end_time = command.end_time + for i in range(origin_todo_length, len(agent.todo)): + agent.todo[i].start_time = start_time + i - origin_todo_length + agent.todo[i].end_time = end_time + agent.todo[-1].end_type = ActionType.SUPPLY + agent.cmd_todo.append(command) + + return command + + def command_switch(self, agent_id, weapon_idx, start_time=0, subtask=False) -> Command: + agent = self.get_agent(agent_id) + origin_todo_length = len(agent.todo) # 记录原 todo 的长度 + self.command_supply(agent_id, subtask=True) + state = agent.todo[-1].state if agent.todo else agent.state + agent.todo.append(Action(agent_id=agent_id, + action_type=ActionType.SWITCH_WEAPON, + weapon_idx=weapon_idx, + weapon_name=agent.switchable_weapons[weapon_idx].name, + des=state.pos, + start_time=start_time, + state=state)) + + command = Command(agent_id=agent_id, + action_type=ActionType.SWITCH_WEAPON, + weapon_idx=weapon_idx, + weapon_name=agent.switchable_weapons[weapon_idx].name, + des=state.pos, + start_time=start_time, + end_time=start_time+len(agent.todo)-origin_todo_length-1, + state=state) + + if not subtask: + end_time = command.end_time + for i in range(origin_todo_length, len(agent.todo)): + agent.todo[i].start_time = start_time + i - origin_todo_length + agent.todo[i].end_time = end_time + agent.todo[-1].end_type = ActionType.SWITCH_WEAPON + agent.cmd_todo.append(command) + + return command + + def make_plan(self, command: Command, subtask=False): + if command.action_type == ActionType.MOVE: + return self.command_move(command.agent_id, command.des, start_time=command.start_time, subtask=subtask) + elif command.action_type == ActionType.ATTACK: + return self.command_attack(command.agent_id, command.target_id, attack_count=command.attack_count, start_time=command.start_time, subtask=subtask) + elif command.action_type == ActionType.SUPPLY: + return self.command_supply(command.agent_id, start_time=command.start_time, subtask=subtask) + elif command.action_type == ActionType.SWITCH_WEAPON: + return self.command_switch(command.agent_id, command.weapon_idx, start_time=command.start_time, subtask=subtask) + else: + raise ValueError(f"Invalid command type: {command.action_type}") + + def todo_action(self, agent_id: str, retry=0) -> Optional[Action]: + agent = self.get_agent(agent_id) + + if not agent.todo: + return None + + action = agent.todo.pop(0) + valid, msg = self.check_validity(action) + + if valid or retry > 2: + if agent.cmd_todo[0].action_type == action.end_type: + agent.cmd_todo.pop(0) + return action + + print(f"Retrying to plan actions for {agent_id}: {msg}") + + if action.action_type == ActionType.RELEASE: + for pos in get_adj_pos(action.des, 1): + if pos in self.map.nodes and not self.map.nodes[pos].agent_id: + action.des = pos + return action + return None # Wait + + command = agent.cmd_todo.pop(0) + + # 处理异常 + while agent.todo and agent.todo[0].end_type != command.action_type: + agent.todo.pop(0) + if agent.todo: + agent.todo.pop(0) + + current_actions = agent.todo.copy() + current_commands = agent.cmd_todo.copy() + + self.make_plan(command) + agent.todo.extend(current_actions) + agent.cmd_todo.extend(current_commands) + return self.todo_action(agent_id, retry=retry+1) + + def check_validity(self, action: Action) -> Tuple[bool, str]: + """ + 检查动作是否合法 + """ + if action.action_type == ActionType.END_OF_TURN: + return True, "OK" + + map_data = self.map.nodes + spotted_enemy_ids = self.spotted_enemy_ids + if action.agent_id not in self.teams[self.player_id].agents: + return False, f"Invalid agent id: {action.agent_id}" + + agent = self.get_agent(action.agent_id) + + if action.action_type not in agent.action_space.keys(): + return False, f"Invalid action {action.action_type.name} for agent {agent.agent_id[:8]}" + + if action.action_type == ActionType.MOVE: + des = action.des + + node = map_data[des] + + if node.team_id == agent.team_id and (not agent.agent_type in (target := self.get_agent(node.agent_id)).available_types or len(target.parked_agents) >= target.capacity): + return False, f"{node.agent_id[:8]} at {des} is an ally but not available" + + if node.agent_id in spotted_enemy_ids: + return False, f"{des} has been occupied by enemy {node.agent_id[:8]}" + + if get_axial_dis(agent.pos, des) > agent.mobility: + return False, f"Destination is out of range. Mobility: {agent.mobility} Euclidean distance: {get_axial_dis(agent.pos, des)}" + + try: + mobility = min(agent.fuel, agent.mobility) + adj_pos = get_adj_pos(agent.pos, int(mobility)) + aval_data = { + pos: node + for pos in adj_pos if pos in map_data and + ( + (node := map_data[pos]).team_id != self.enemy_id + or + node.agent_id not in spotted_enemy_ids + ) + } + aval_nodes = astar_search(aval_data, agent.move_type, start=agent.pos, limit=mobility) + if action.des in aval_nodes: + return True, "OK" + else: + return False, f"{des} could not be reached in one step." + except Exception as e: + return False, f"Failed to move to {des}: {e}" + + elif action.action_type == ActionType.ATTACK: + try: + target_agent = self.get_agent(action.target_id) + except Exception as e: + return False, f"Invalid target agent id: {action.target_id}, ignored" + + des = action.des + + if agent.team_id == target_agent.team_id: + return False, f"Cannot attack teammate. Attack: {action.agent_id}. Defend: {action.target_id}" + if agent.weapon is None: + return False, "No weapon equipped" + if agent.ammo <= 0: + return False, "No ammo left" + if target_agent.move_type not in agent.strike_types: + return False, "Target agent cannot be attacked with this weapon" + if get_axial_dis(agent.pos, des) > agent.attack_range: + return False, f"Target agent is out of range. Attack range: {agent.attack_range} Distance: {get_axial_dis(agent.pos, des)}" + + elif action.action_type == ActionType.SWITCH_WEAPON: + weapon_idx = action.weapon_idx + around_supply = False + for pos in get_adj_pos(agent.pos, 1): + if pos in map_data and map_data[pos].agent_id and (target_agent := self.get_agent(map_data[pos].agent_id)).team_id == agent.team_id and target_agent.has_supply: + around_supply = True + if not around_supply: + return False, "Agent is not around a supply module" + if weapon_idx >= len(agent.switchable_weapons): + return False, f"Invalid weapon id {weapon_idx}. Max id: {len(agent.switchable_weapons) - 1}" + + elif action.action_type == ActionType.RELEASE: + if action.target_id >= len(agent.parked_agents): + return False, f"Trying to release an agent that is not in the queue. Target id: {action.target_id}, Current queue: {agent.parked_agents}" + target_agent = agent.parked_agents[action.target_id] + des = action.des + if get_axial_dis(agent.pos, des) > 1: + return False, "Only adjacent positions can be released" + if map_data[des].team_id != -1: + return False, "Cannot release agent to occupied position" + + elif action.action_type == ActionType.INTERACT: + agent = self.get_agent(action.agent_id) + target_agent = self.get_agent(action.target_id) + if get_axial_dis(agent.pos, target_agent.pos) > 1: + return False, "Target agent is out of range" + if agent.team_id != target_agent.team_id: + return False, "Cannot interact with enemy agent" + if agent.agent_type not in target_agent.available_types: + return False, f"Target agent {target_agent.agent_id[:8]} is not interactable with agent {agent}" + if len(target_agent.parked_agents) >= target_agent.capacity: + return False, f"Target agent {target_agent.agent_id[:8]} full" + + return True, "OK" + + def update(self, sync_info): + """ + 使用真实引擎 + """ + sync_agents = sync_info['units'] + spotted_enemies = sync_info['spottedHostiles'] + map_data = self.map.nodes + + reward = 0 + + for sync_agent in [*sync_agents, *spotted_enemies]: + agent_id = sync_agent['agent_id'] + agent = self.get_agent(agent_id) + # 清除与地图关联 + if agent.pos != (-1, -1): + node = map_data[agent.pos] + if node.agent_id == agent_id: + node.agent_id = None + node.team_id = -1 + agent.pos = (sync_agent['pos']['q'], sync_agent['pos']['r']) + agent.fuel = sync_agent['fuel'] + if agent.team_id == self.enemy_id: + reward += agent.endurance - sync_agent['endurance'] + agent.endurance = sync_agent['endurance'] + agent.commenced_action = sync_agent['commenced_action'] + current_weapon = sync_agent['current_weapon'] + if current_weapon: + for weapon in agent.switchable_weapons: + if weapon.name == current_weapon: + agent.weapon = weapon + break + agent.weapon.ammo = sync_agent['ammo'] + else: + agent.weapon = None + # 更新与地图关联 + if agent.alive: + node = map_data[agent.pos] + node.agent_id = agent_id + node.team_id = agent.team_id + + for agent_id in self._spotted_enemy_ids: + agent = self.get_agent(agent_id) + # 清除与地图关联 + if agent.pos != (-1, -1): + node = map_data[agent.pos] + if node.agent_id == agent_id: + node.agent_id = None + node.team_id = -1 + + # 更新敌人感知 + self._spotted_enemy_ids = [enemy['agent_id'] for enemy in spotted_enemies] + + """ + NOTE: Clear the states before observation + """ + self._legal_actions = None + self._legal_action_ids = None + self._spotted_agents = None + + obs = self.observe() + truncated = self.episode_steps >= self.max_episode_steps + terminated = self.teams[self.enemy_id].alive_count == 0 + self._cumulative_rewards[self.player_id] += reward + + info = {} + info["accum_reward"] = (self._cumulative_rewards[0], self._cumulative_rewards[1]) + info["next_player"] = self.player_id + 1 if not terminated else -1 + + if self.replay_path is not None: + self._frames[self.player_id].append(self.render(mode="rgb_array")) + + if terminated or truncated: + # The eval_episode_return is calculated from Player 1's perspective + info["eval_episode_return"] = reward if self.player_id == 0 else -reward + info["done_reason"] = "Terminated" if terminated else "Truncated" + + if self.replay_path is not None: + self.save_replay() + + return obs, reward, terminated, truncated, info + + + + \ No newline at end of file