Source code for antorum.utils

import asyncio
import base64
import datetime
import enum
import logging
from typing import Literal, List, TYPE_CHECKING, Dict, Tuple, Union
import struct

if TYPE_CHECKING:
    from antorum import multiplayer
    from antorum.packets.inventory import InventoryItem
    from antorum.packets.inventory_add import ItemResource
    from antorum.packets.world_entities import Entity
    from antorum.packets.chat import ChatMessage

from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5

BYTEORDER: Literal['little', 'big'] = "big"

ENEMIES = ["Gremneer"]


[docs] class StateType(enum.Enum): INFO = 0 TRANSFORM = 1 MOVEMENT = 2 HEALTH = 3 ITEM = 4 INTERACTABLE = 5 PLAYER = 6 EQUIPMENT = 7 NPC = 8 FISHER = 9 CLASS = 10 QUEST_GIVER = 11 MINER = 12 ANIMATOR = 13
[docs] class EncryptionHelper: def __init__(self, key): self.key = RSA.import_key(key)
[docs] def encrypt(self, data): return base64.b64encode(PKCS1_v1_5.new(self.key.public_key()).encrypt(data))
[docs] class BufferReader: def __init__(self, data: bytes): self.data = data self.pointer = 0
[docs] def read(self, size: int) -> bytes: data = self.data[self.pointer:self.pointer + size] self.pointer += size return data
[docs] def read_bool(self) -> bool: return bool(self.read_int8())
[docs] def read_int8(self, signed: bool = False) -> int: return int.from_bytes(self.read(1), BYTEORDER, signed=signed)
[docs] def read_int16(self, signed: bool = False) -> int: return int.from_bytes(self.read(2), BYTEORDER, signed=signed)
[docs] def read_int32(self, signed: bool = False) -> int: return int.from_bytes(self.read(4), BYTEORDER, signed=signed)
[docs] def read_int64(self, signed: bool = False) -> int: return int.from_bytes(self.read(8), BYTEORDER, signed=signed)
[docs] def read_string(self) -> str: length = self.read_int64() return self.read(length).decode("utf-8")
[docs] def read_float(self) -> float: return struct.unpack((">" if BYTEORDER == "big" else "<") + "f", self.read(4))[0]
[docs] class BufferWriter: def __init__(self): self.data = b"" def __bytes__(self): return self.data
[docs] def write(self, data: bytes): self.data += data
[docs] def write_int8(self, value: int): self.write(value.to_bytes(1, BYTEORDER))
[docs] def write_int16(self, value: int): self.write(value.to_bytes(2, BYTEORDER))
[docs] def write_int32(self, value: int): self.write(value.to_bytes(4, BYTEORDER))
[docs] def write_int64(self, value: int): self.write(value.to_bytes(8, BYTEORDER))
[docs] def write_float(self, value: float): self.write(struct.pack((">" if BYTEORDER == "big" else "<") + "f", value))
[docs] def write_string(self, value: str): self.write_int64(len(value)) self.write(value.encode("utf-8"))
[docs] def write_bytes(self, value: bytes): self.write_int64(len(value)) self.write(value)
[docs] def get_entity_from_player_id(player_id: int, entities: List["world_entities.Entity"]): for entity in entities: if entity.states.get(StateType.PLAYER) and entity.states[StateType.PLAYER].state.player_id == player_id: return entity return None
[docs] def get_future_position_from_entity(network_id, game: "multiplayer.Game"): return game.entities[network_id].states[StateType.MOVEMENT].state.destinations[-1] \ if game.entities[network_id].states[StateType.MOVEMENT].state.destinations \ else game.entities[network_id].position
[docs] def get_player_id_from_username(username: str, game: "multiplayer.Game"): for entity in game.entities.values(): if entity.name == username: return entity.states[StateType.PLAYER].state.player_id return None
[docs] def get_inventory_diff(old_inventory: Dict[int, "InventoryItem"], new_inventory: Dict[int, "InventoryItem"]): added = {} removed = {} for index, item in new_inventory.items(): if index not in old_inventory: added[index] = item elif item.resource.resource_id != old_inventory[index].resource.resource_id: removed[index] = old_inventory[index] added[index] = item elif item.amount != old_inventory[index].amount: added[index] = item for index, item in old_inventory.items(): if index not in new_inventory: removed[index] = item return added, removed
[docs] def map_to_game_coords(coords: List[Tuple[float, float]]): return [((x / 16) * 3, ((5632 - y) / 16) * 3) for x, y in coords]
[docs] def is_nearby(coords: Tuple[float, float], other_coords: Tuple[float, float], distance: float = 5): return abs(coords[0] - other_coords[0]) <= distance and abs(coords[1] - other_coords[1]) <= distance
[docs] def distance_to_entity(coords: Tuple[float, float], entity: "Entity"): return ((coords[0] - entity.position[0]) ** 2 + (coords[1] - entity.position[1]) ** 2) ** 0.5
[docs] def get_nearest_entity(coords: Tuple[float, float], entities: Dict[int, "Entity"]) -> Union[None, "Entity"]: nearest_entity = None nearest_distance = float("inf") for entity in entities.values(): if entity.position: distance = ((coords[0] - entity.position[0]) ** 2 + (coords[1] - entity.position[1]) ** 2) if distance < nearest_distance: nearest_distance = distance nearest_entity = entity return nearest_entity
[docs] def distance_to_closest_enemy(coords: Tuple[float, float], entities: Dict[int, "Entity"]): nearest_distance = float("inf") for entity in entities.values(): if entity.position and entity.name in ENEMIES: distance = entity.distance_to(coords) if distance < nearest_distance: nearest_distance = distance return nearest_distance
[docs] def get_nearest_safe_entity(coords: Tuple[float, float], requested_entities: Dict[int, "Entity"], all_entities: Dict[int, "Entity"], safe_distance: float = 10): nearest_entity = None nearest_distance = float("inf") for entity in requested_entities.values(): if entity.position != (-1, -1) and distance_to_closest_enemy( entity.position, all_entities) > safe_distance: distance = entity.distance_to(coords) if distance < nearest_distance: nearest_distance = distance nearest_entity = entity return nearest_entity
[docs] def time_to_dest(start_coords: Tuple[float, float], dest_coords: Tuple[float, float], speed: float): return ((dest_coords[0] - start_coords[0]) ** 2 + (dest_coords[1] - start_coords[1]) ** 2) ** 0.5 / speed
[docs] def time_to_dests(start_coords: Tuple[float, float], destinations: List[Tuple[float, float]], speed: float): if not destinations: return 0 time = 0 for i in range(len(destinations) - 1): time += time_to_dest(destinations[i], destinations[i + 1], speed) time += ((destinations[0][0] - start_coords[0]) ** 2 + (destinations[0][1] - start_coords[1]) ** 2) ** 0.5 / speed return time
[docs] def coords_in_bounds(coords: Tuple[float, float], bounds: Tuple[Tuple[float, float], Tuple[float, float]]): return bounds[0][0] <= coords[0] <= bounds[1][0] and bounds[0][1] <= coords[1] <= bounds[1][1]
[docs] def message_contains_since(message: str, messages: List[Tuple[datetime.datetime, "ChatMessage"]], since: datetime.datetime): for timestamp, chat_message in reversed(messages): if timestamp < since: return False if message in chat_message.message: return True return False
[docs] def inventory_contains_resource_id(resource_id: int, inventory: Dict[int, "InventoryItem"], amount: int): total = 0 for item in inventory.values(): if item.resource.resource_id == resource_id: total += item.amount return total >= amount
[docs] async def wait_for(predicate, timeout: int): i = 0 while not predicate(): await asyncio.sleep(0.1) i += 1 if i > timeout * 10: return False return True
[docs] def get_resource_by_name(name: str, resources: Dict[int, "ItemResource"]): for resource in resources.values(): if resource.resource_name == name.lower(): return resource return None
[docs] def get_inventory_slot_by_resource_id(resource_id: int, inventory: Dict[int, "InventoryItem"]): for slot, item in inventory.items(): if item.resource.resource_id == resource_id: return slot return None
[docs] def amount_of_resource_in_inventory(resource_id: int, inventory: Dict[int, "InventoryItem"]): total = 0 for slot, item in inventory.items(): if item.resource.resource_id == resource_id: total += item.amount return total
[docs] def has_sufficient_level(levels: Dict[int, List[str]], level: int, item: str): for l in levels.keys(): if l <= level: if item in levels[l]: return True else: return False
[docs] async def emulate_move(start_coords: Tuple[float, float], destinations: List[Tuple[float, float]], speed: float, client: "multiplayer.Client"): time_to_next = time_to_dest(start_coords, destinations[0], speed) begin_coords = start_coords end_coords = destinations[0] for i in range(len(destinations)): time_left = time_to_next while time_left > 0: await asyncio.sleep(1) if not client.game.entities[client.game.local_player.network_id].is_moving: return time_left -= 1 client.game.local_player.position = ( start_coords[0] + (end_coords[0] - begin_coords[0]) * (1 - time_left / time_to_next), begin_coords[1] + (end_coords[1] - begin_coords[1]) * (1 - time_left / time_to_next) ) logging.debug(f"Emulated move to {client.game.local_player.position}") if i + 1 >= len(destinations) - 1: break time_to_next = time_to_dest(destinations[i], destinations[i + 1], speed) start_coords = destinations[i] end_coords = destinations[i + 1] client.game.local_player.position = destinations[-1] client.game.entities[client.game.local_player.network_id].stop_moving()