Files
mcstatus/mcstatus/pinger.py
2015-01-26 16:15:27 -08:00

147 lines
5.7 KiB
Python

import datetime
import json
import random
from six import string_types
from mcstatus.protocol.connection import Connection
class ServerPinger:
def __init__(self, connection, host="", port=0, version=47, ping_token=None):
if ping_token is None:
ping_token = random.randint(0, (1 << 63) - 1)
self.version = version
self.connection = connection
self.host = host
self.port = port
self.ping_token = ping_token
def handshake(self):
packet = Connection()
packet.write_varint(0)
packet.write_varint(self.version)
packet.write_utf(self.host)
packet.write_ushort(self.port)
packet.write_varint(1) # Intention to query status
self.connection.write_buffer(packet)
def read_status(self):
request = Connection()
request.write_varint(0) # Request status
self.connection.write_buffer(request)
response = self.connection.read_buffer()
if response.read_varint() != 0:
raise IOError("Received invalid status response packet.")
try:
raw = json.loads(response.read_utf())
except ValueError:
raise IOError("Received invalid JSON")
try:
return PingResponse(raw)
except ValueError as e:
raise IOError("Received invalid status response: %s" % e)
def test_ping(self):
request = Connection()
request.write_varint(1) # Test ping
request.write_long(self.ping_token)
sent = datetime.datetime.now()
self.connection.write_buffer(request)
response = self.connection.read_buffer()
received = datetime.datetime.now()
if response.read_varint() != 1:
raise IOError("Received invalid ping response packet.")
received_token = response.read_long()
if received_token != self.ping_token:
raise IOError("Received mangled ping response packet (expected token %d, received %d)" % (
self.ping_token, received_token))
delta = (received - sent)
# We have no trivial way of getting a time delta :(
return (delta.days * 24 * 60 * 60 + delta.seconds) * 1000 + delta.microseconds / 1000.0
class PingResponse:
class Players:
class Player:
def __init__(self, raw):
if type(raw) is not dict:
raise ValueError("Invalid player object (expected dict, found %s" % type(raw))
if "name" not in raw:
raise ValueError("Invalid player object (no 'name' value)")
if not isinstance(raw["name"], string_types):
raise ValueError("Invalid player object (expected 'name' to be str, was %s)" % type(raw["name"]))
self.name = raw["name"]
if "id" not in raw:
raise ValueError("Invalid player object (no 'id' value)")
if not isinstance(raw["id"], string_types):
raise ValueError("Invalid player object (expected 'id' to be str, was %s)" % type(raw["id"]))
self.id = raw["id"]
def __init__(self, raw):
if type(raw) is not dict:
raise ValueError("Invalid players object (expected dict, found %s" % type(raw))
if "online" not in raw:
raise ValueError("Invalid players object (no 'online' value)")
if type(raw["online"]) is not int:
raise ValueError("Invalid players object (expected 'online' to be int, was %s)" % type(raw["online"]))
self.online = raw["online"]
if "max" not in raw:
raise ValueError("Invalid players object (no 'max' value)")
if type(raw["max"]) is not int:
raise ValueError("Invalid players object (expected 'max' to be int, was %s)" % type(raw["max"]))
self.max = raw["max"]
if "sample" in raw:
if type(raw["sample"]) is not list:
raise ValueError("Invalid players object (expected 'sample' to be list, was %s)" % type(raw["max"]))
self.sample = [PingResponse.Players.Player(p) for p in raw["sample"]]
else:
self.sample = None
class Version:
def __init__(self, raw):
if type(raw) is not dict:
raise ValueError("Invalid version object (expected dict, found %s" % type(raw))
if "name" not in raw:
raise ValueError("Invalid version object (no 'name' value)")
if not isinstance(raw["name"], string_types):
raise ValueError("Invalid version object (expected 'name' to be str, was %s)" % type(raw["name"]))
self.name = raw["name"]
if "protocol" not in raw:
raise ValueError("Invalid version object (no 'protocol' value)")
if type(raw["protocol"]) is not int:
raise ValueError("Invalid version object (expected 'protocol' to be int, was %s)" % type(raw["protocol"]))
self.protocol = raw["protocol"]
def __init__(self, raw):
self.raw = raw
if "players" not in raw:
raise ValueError("Invalid status object (no 'players' value)")
self.players = PingResponse.Players(raw["players"])
if "version" not in raw:
raise ValueError("Invalid status object (no 'version' value)")
self.version = PingResponse.Version(raw["version"])
if "description" not in raw:
raise ValueError("Invalid status object (no 'description' value)")
self.description = raw["description"]
if "favicon" in raw:
self.favicon = raw["favicon"]
else:
self.favicon = None
self.latency = None