mirror of
https://github.com/Dinnerbone/mcstatus.git
synced 2026-04-06 03:51:23 +08:00
77 lines
2.5 KiB
Python
77 lines
2.5 KiB
Python
from mcstatus.pinger import ServerPinger
|
|
from mcstatus.protocol.connection import TCPSocketConnection, UDPSocketConnection
|
|
from mcstatus.querier import ServerQuerier
|
|
import dns.resolver
|
|
|
|
|
|
class MinecraftServer:
|
|
def __init__(self, host, port=25565):
|
|
self.host = host
|
|
self.port = port
|
|
|
|
@staticmethod
|
|
def lookup(address):
|
|
host = address
|
|
port = None
|
|
if ":" in address:
|
|
parts = address.split(":")
|
|
if len(parts) > 2:
|
|
raise ValueError("Invalid address '%s'" % address)
|
|
host = parts[0]
|
|
port = int(parts[1])
|
|
if port is None:
|
|
port = 25565
|
|
try:
|
|
answers = dns.resolver.query("_minecraft._tcp." + host, "SRV")
|
|
if len(answers):
|
|
answer = answers[0]
|
|
host = str(answer.target).rstrip(".")
|
|
port = int(answer.port)
|
|
except Exception:
|
|
pass
|
|
|
|
return MinecraftServer(host, port)
|
|
|
|
def ping(self, retries=3, **kwargs):
|
|
attempt = 0
|
|
connection = TCPSocketConnection((self.host, self.port))
|
|
exception = None
|
|
while attempt < retries:
|
|
try:
|
|
pinger = ServerPinger(connection, host=self.host, port=self.port, **kwargs)
|
|
pinger.handshake()
|
|
return pinger.test_ping()
|
|
except Exception as e:
|
|
exception = e
|
|
attempt += 1
|
|
raise exception
|
|
|
|
def status(self, retries=3, **kwargs):
|
|
attempt = 0
|
|
connection = TCPSocketConnection((self.host, self.port))
|
|
exception = None
|
|
while attempt < retries:
|
|
try:
|
|
pinger = ServerPinger(connection, host=self.host, port=self.port, **kwargs)
|
|
pinger.handshake()
|
|
result = pinger.read_status()
|
|
result.latency = pinger.test_ping()
|
|
return result
|
|
except Exception as e:
|
|
exception = e
|
|
attempt += 1
|
|
raise exception
|
|
|
|
def query(self, retries=3):
|
|
attempt = 0
|
|
exception = None
|
|
while attempt < retries:
|
|
try:
|
|
connection = UDPSocketConnection((self.host, self.port))
|
|
querier = ServerQuerier(connection)
|
|
querier.handshake()
|
|
return querier.read_query()
|
|
except Exception as e:
|
|
exception = e
|
|
attempt += 1
|
|
raise exception |