Source code for dolor.servers.server

import asyncio
import aiohttp
import base64
import uuid
import time
from aioconsole import aprint

from .. import util
from .. import enums
from .. import encryption
from .. import connection
from ..packet_handler import packet_listener, PacketHandler
from ..versions import Version
from ..types import Chat, Identifier
from ..packets import PacketContext, serverbound, clientbound

[docs]def connection_task(func): """Decorator for connection tasks within a class.""" func._connection_task = True return func
[docs]class ServerConnection(connection.Connection): def __init__(self, server, reader, writer): self.ctx = PacketContext(Version(None)) super().__init__(serverbound) self.server = server self.reader = reader self.writer = writer self.should_listen_sequentially = True self.tasks = [] self.central_task = None self.name = "" self.uuid = uuid.UUID(int=0) @property def is_player(self): return self.current_state == enums.State.Play
[docs] async def disconnect(self, reason): if isinstance(reason, Exception): reason = f"{type(reason).__name__}: {reason}" packet = { enums.State.Login: clientbound.DisconnectLoginPacket, enums.State.Play: clientbound.DisconnectPlayPacket, }.get(self.current_state) if packet is not None: try: await self.write_packet(packet, reason = reason, ) except: pass self.close() await self.wait_closed()
[docs] async def set_compression(self, threshold): await self.write_packet(clientbound.SetCompressionPacket, threshold = threshold, ) self.comp_threshold = threshold
[docs] async def login_success(self): await self.write_packet(clientbound.LoginSuccessPacket, uuid = self.uuid, username = self.name, ) self.current_state = enums.State.Play self.should_listen_sequentially = False self.central_task = asyncio.create_task(self.server.central_connection_task(self))
[docs] async def message(self, message, *, position=enums.ChatPosition.Chat, sender=None): if sender is None: sender = uuid.UUID(int=0) else: sender = sender.uuid await self.write_packet(clientbound.ChatMessagePacket, data = message, position = position, sender = sender, )
[docs] def close(self): if not self.is_closing(): super().close() if self.central_task is not None: self.central_task.cancel()
[docs] async def wait_closed(self): if self.is_closing(): await super().wait_closed() if self.central_task is not None: # Use asyncio.wait_for? await self.central_task
[docs] def create_task(self, coro): """Internal function used to ensure that all listeners complete.""" real_task = asyncio.create_task(coro) self.tasks.append(real_task) async def task(): try: await real_task finally: self.tasks.remove(real_task) return asyncio.create_task(task())
[docs] async def write_packet(self, *args, **kwargs): p = await super().write_packet(*args, **kwargs) await self.server.listen_to_packet(self, p, outgoing=True) return p
def __eq__(self, other): return self.uuid == other.uuid def __hash__(self): return hash(self.uuid) def __repr__(self): return f"{type(self).__name__}({repr(self.name)}, {repr(self.uuid)})"
[docs]class Server(PacketHandler): session_server = "https://sessionserver.mojang.com/session/minecraft" Connection = ServerConnection def __init__(self, version, address, port=25565, *, lang_file = None, max_players = 20, description = None, favicon = None, offline = False, comp_enabled = True, comp_threshold = 256, ): self.version = Version(version, check_supported=True) self.address = address self.port = port self.private_key = None self.public_key = None self.srv = None self.connections = [] if lang_file is not None: Chat.Chat.load_translations(lang_file) self.max_players = max_players if favicon is not None: if util.is_pathlike(favicon): with open(favicon, "rb") as f: favicon = f.read() favicon = base64.encodebytes(favicon).replace(b"\n", b"") favicon = f"data:image/png;base64,{favicon.decode('utf-8')}" self.description = util.default(description, Chat.default()) self.favicon = favicon self.offline = offline self.comp_enabled = comp_enabled self.comp_threshold = comp_threshold self.connection_tasks = [] self.register_intrinsic_connection_tasks() super().__init__()
[docs] def register_connection_task(self, func): """ Registers a packet listener. func is a coroutine function. """ if not asyncio.iscoroutinefunction(func): raise TypeError(f"Connection task {func.__name__} isn't a coroutine function") self.connection_tasks.append(func)
[docs] def unregister_connection_task(self, func): """Unregisters a connection task.""" self.connection_tasks.remove(func)
[docs] def external_connection_task(self, func): """Decorator for external connection tasks.""" self.register_connection_task(func) return func
[docs] def register_intrinsic_connection_tasks(self): for attr in dir(self): func = getattr(self, attr) # If the function was decorated with # the connection_task function, then # it will have the _connection_task # attribute, which will also be true if hasattr(func, "_connection_task") and func._connection_task: self.register_connection_task(func)
[docs] def safe_connection_func(self, task): async def safe(c, *args, **kwargs): try: await task(c, *args, **kwargs) except Exception as e: await c.disconnect(e) return safe
[docs] def register_packet_listener(self, *args, outgoing=False): super().register_packet_listener(*args, outgoing=outgoing)
[docs] async def listen_to_packet(self, c, p, *, outgoing): listeners = self.listeners_for_packet(c, p, outgoing=outgoing) listeners = [self.safe_connection_func(x) for x in listeners] if c.should_listen_sequentially: await asyncio.gather(*(x(c, p) for x in listeners)) else: for func in listeners: c.create_task(func(c, p))
[docs] async def listen(self, c): while self.is_serving() and not c.is_closing(): try: p = await c.read_packet() except Exception as e: await c.disconnect(e) break if p is None: break await self.listen_to_packet(c, p, outgoing=False) try: await asyncio.wait_for(asyncio.gather(*c.tasks), 1) except asyncio.TimeoutError: for task in c.tasks: task.cancel()
[docs] async def new_connection(self, reader, writer): c = self.Connection(self, reader, writer) async with c: self.append(c) try: await self.listen(c) finally: self.remove(c)
[docs] async def central_connection_task(self, c): tasks = [self.safe_connection_func(x) for x in self.connection_tasks] await asyncio.gather(*(x(c) for x in tasks))
[docs] async def main_task(self): while self.is_serving(): await asyncio.sleep(1)
[docs] def is_serving(self): return self.srv is not None and self.srv.is_serving()
[docs] def close(self): if self.srv is not None: for c in self.connections: c.close() self.connections.clear() self.srv.close()
[docs] async def wait_closed(self): if self.srv is not None: await self.srv.wait_closed()
def __del__(self): self.close() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, exc_tb): self.close() await self.wait_closed()
[docs] def append(self, conn): self.connections.append(conn)
[docs] def remove(self, conn): # Loop over connections to check identity # of connections instead of equality for i, c in enumerate(self.connections): if c is conn: self.connections.pop(i)
[docs] async def startup(self): self.private_key, self.public_key = encryption.gen_private_public_keys() self.srv = await asyncio.start_server(self.new_connection, self.address, self.port)
[docs] async def on_start(self): await self.main_task()
[docs] async def start(self): await self.startup() async with self: await self.on_start()
[docs] def run(self): try: asyncio.run(self.start()) except KeyboardInterrupt: self.close()
@property def players(self): return [x for x in self.connections if x.is_player]
[docs] async def send_server_hash(self, c, server_hash): async with aiohttp.ClientSession() as s: async with s.get( f"{self.session_server}/hasJoined?username={c.name}&serverId={server_hash}", ) as resp: if resp.status == 204: return False # TODO: Verify signature? data = await resp.json() c.uuid = uuid.UUID(hex=data["id"]) return True
# Default packet listeners and tasks @packet_listener(serverbound.HandshakePacket) async def _on_handshake(self, c, p): c.ctx = PacketContext(Version(p.proto_version)) c.current_state = p.next_state @packet_listener(serverbound.RequestPacket) async def _on_request(self, c, p): await c.write_packet(clientbound.ResponsePacket, response = clientbound.ResponsePacket.Response.Response( version = self.version, players = { "max": self.max_players, "online": len(self.players), "sample": [], }, description = self.description, favicon = self.favicon, ), ) @packet_listener(serverbound.PingPacket) async def _on_ping(self, c, p): await c.write_packet(clientbound.PongPacket, payload = p.payload, ) @packet_listener(serverbound.LoginStartPacket) async def _on_login_start(self, c, p): if c.ctx.version != self.version: await c.disconnect({ "translate": "multiplayer.disconnect.outdated_client", "with": [self.version.name], }) return if len(self.players) >= self.max_players: await c.disconnect({ "translate": "multiplayer.disconnect.server_full", }) return if p.name in (x.name for x in self.connections): await c.disconnect({ "translate": "multiplayer.disconnect.name_taken", }) return c.name = p.name if not self.offline: c.verify_token = encryption.gen_verify_token() await c.write_packet(clientbound.EncryptionRequestPacket, server_id = "", public_key = self.public_key, verify_token = c.verify_token, ) else: if self.comp_enabled: await c.set_compression(self.comp_threshold) await c.login_success() @packet_listener(serverbound.EncryptionResponsePacket) async def _on_encryption_response(self, c, p): shared_secret, verify_token = encryption.decrypt_secret_and_token(self.private_key, p.shared_secret, p.verify_token) c.enable_encryption(shared_secret) if verify_token != c.verify_token: await c.disconnect({ "translate": "multiplayer.disconnect.generic", }) return del c.verify_token server_hash = encryption.gen_server_hash("", shared_secret, self.public_key) if not await self.send_server_hash(c, server_hash): await c.disconnect({ "translate": "multiplayer.disconnect.unverified_username", }) return if c in self.players: await c.disconnect({ "translate": "multiplayer.disconnect.duplicate_login", }) return if self.comp_enabled: await c.set_compression(self.comp_threshold) await c.login_success() @connection_task async def _join_game_task(self, c): # TODO: Make this all programmatic dim_identifier = Identifier.Identifier("minecraft:overworld") p = c.create_packet(clientbound.JoinGamePacket, entity_id = 1, game_mode = enums.GameMode.Survival, prev_game_mode = enums.GameMode.Invalid, world_names = [dim_identifier], world_name = dim_identifier, max_players = self.max_players, view_distance = 10, ) if c.ctx.version < "20w21a": pass elif c.ctx.version < "1.16.2-pre3": p.dimension = dim_identifier else: p.dimension.name = dim_identifier p.dimension.shrunk = False p.dimension.coordinate_scale = 1.0 p.dimension.infiniburn = "minecraft:infiniburn_overworld" p.dimension.effects = dim_identifier if c.ctx.version < "1.16-pre3": p.dimension_codec["dimension"] = [{ "key": dim_identifier, "element": dim_identifier, }] elif c.ctx.version < "20w28a": p.dimension_codec["dimension"] = [p.dimension] else: p.dimension_codec["minecraft:dimension_type"] = { "type": "minecraft:dimension_type", "value": [{ "name": dim_identifier, "id": 0, "element": p.dimension, }], } p.dimension_codec["minecraft:worldgen/biome"] = { "type": "minecraft:worldgen/biome", "value": [{ "name": "minecraft:plains", "id": 1, "element": { "category": "plains", "precipitation": "rain", "downfall": 0.4000000059604645, "temperature": 0.800000011920929, "depth": 0.125, "scale": 0.05000000074505806, "effects": { "sky_color": 0x78a7ff, "fog_color": 0xc0d8ff, "water_color": 0x3f76e4, "water_fog_color": 0x050533, "mood_sound": { "sound": "minecraft:ambient.cave", "offset": 2.0, "tick_delay": 6000, "block_search_extent": 8, }, }, } }], } await c.write_packet(p) await c.write_packet(clientbound.PlayerPositionAndLook) @connection_task async def _keep_alive_task(self, c): timeout = 25 sleep = 5 while not c.is_closing(): await asyncio.sleep(sleep) keep_alive_id = int(time.time() * 1000) await c.write_packet(clientbound.KeepAlivePacket, keep_alive_id = keep_alive_id, ) try: p = await asyncio.wait_for(c.read_packet(serverbound.KeepAlivePacket), timeout) except asyncio.TimeoutError: await c.disconnect({ "translate": "disconnect.timeout", }) return if p is None: return if p.keep_alive_id != keep_alive_id: await c.disconnect({ "translate": "disconnect.timeout", }) return