Source code for dolor.yggdrasil

"""Code for interfacing with Mojang's Yggdrasil API."""

import aiohttp
import uuid

[docs]class YggdrasilError(Exception): """An error from the Yggdrasil API.""" def __init__(self, status, info): self.status = status self.info = info msg = f"[{self.status}] {self.info['error']}" error_message = self.info.get("errorMessage") if error_message is not None: msg += f": {error_message}" cause = self.info.get("cause") if cause is not None: msg += f" (Cause: {cause})" super().__init__(msg)
[docs]class AuthenticationToken: """An abstraction over the Yggdrasil API. Parameters ---------- access_token : :class:`str`, optional The client's access token. client_token : :class:`str`, optional The client's client token. Must be present if ``access_token`` is specified. username : :class:`str`, optional The client's username. password : :class:`str`, optional The client's password. Must be present if ``username`` is specified. Attributes ---------- profile : :class:`Profile` or ``None`` The authentication token's associated profile. """ auth_server = "https://authserver.mojang.com" headers = {"content-type": "application/json"} agent = { "name": "Minecraft", "version": 1, }
[docs] class Profile: """Represents the profile of an :class:`AuthenticationToken`.""" def __init__(self, name, id): self.name = name self.id = id @property def uuid(self): """A :class:`uuid.UUID` representation of the profile's id.""" return uuid.UUID(hex=self.id)
def __init__(self, *, access_token=None, client_token=None, username=None, password=None): if access_token is not None and client_token is None: raise ValueError("Access token without client token") if username is not None and password is None: raise ValueError("Username without password") if password is not None and username is None: raise ValueError("Password without username") self.access_token = access_token self.client_token = client_token self.username = username self.password = password self.profile = None
[docs] async def ensure(self, *, try_validate=False): """Ensures that the authentication token is authenticated. If :attr:`username` is populated, then :meth:`authenticate` will be run. Else, if ``try_validate`` is ``True``, then it will see if the authentication token is valid by running :meth:`validate`. If it's not valid or ``try_validate`` is ``False``, then :meth:`refresh` will be called. Parameters ---------- try_validate : :class:`bool` Whether to try to validate before refreshing the token. """ if self.username is not None: await self.authenticate() else: if not try_validate or not self.validate(): await self.refresh()
[docs] async def validate(self): """Checks whether the authentication token is valid. Requires the :attr:`access_token` attribute to be populated. Returns ------- :class:`bool` Whether the authentication token is valid. """ data = { "accessToken": self.access_token, "clientToken": self.client_token, } try: await self.make_request("validate", data, 204) except YggdrasilError: return False return True
[docs] async def refresh(self): """Refreshes the authentication token. Requires the :attr:`access_token` attribute to be populated. Populates the :attr:`profile` attribute. """ data = { "accessToken": self.access_token, "clientToken": self.client_token, } info = await self.make_request("refresh", data) self.access_token = info["accessToken"] self.client_token = info["clientToken"] self.profile = self.Profile(info["selectedProfile"]["name"], info["selectedProfile"]["id"])
[docs] async def authenticate(self, invalidate_prev=False): """Authenticates the authentication token. Requires the :attr:`username` attribute to be populated. Populates the :attr:`profile` attribute. Parameters ---------- invalidate_prev : :class:`bool` Whether or not to invalidate previous access tokens. """ data = { "agent": self.agent, "username": self.username, "password": self.password, } if not invalidate_prev: data["clientToken"] = self.client_token or uuid.uuid4().hex info = await self.make_request("authenticate", data) self.access_token = info["accessToken"] self.client_token = info["clientToken"] self.profile = self.Profile(info["selectedProfile"]["name"], info["selectedProfile"]["id"])
[docs] async def signout(self): """Invalidates previous access tokens by using the username and password. Requires the :attr:`username` attribute to be populated. """ data = { "username": self.username, "password": self.password, } await self.make_request("signout", data)
[docs] async def invalidate(self): """Invalidates previous access tokens by using the access and client tokens. Requires the :attr:`access_token` attribute to be populated. """ data = { "accessToken": self.access_token, "clientToken": self.client_token, } await self.make_request("invalidate", data)
[docs] async def make_request(self, endpoint, data, ok_status_code=200): """A general function for making a request to the Yggdrasil API. Parameters ---------- endpoint : :class:`str` The endpoint to make the request to. data : :class:`dict` The data to send. ok_status_code : :class:`int`, optional The status code to expect. Any other will result in an :exc:`YggdrasilError`. Returns ------- :class:`dict` or ``None`` The data received from the Yggdrasil API. ``None`` is returned if an :exc:`aiohttp.ContentTypeError` is raised when getting the data. Raises ------ :exc:`YggdrasilError` If the returned status code is different than expected. """ async with aiohttp.ClientSession() as s: async with s.post(f"{self.auth_server}/{endpoint}", json = data, headers = self.headers, ) as resp: if resp.status != ok_status_code: raise YggdrasilError(resp.status, await resp.json()) try: return await resp.json() except aiohttp.ContentTypeError: return None