44 Commits

Author SHA1 Message Date
Nathan Adams
537bf06ebd Release 2.0 2014-09-26 01:26:02 +02:00
Nathan Adams
7bcbbc7409 Added a MinecraftServer.lookup method, converts a string to host/port (including SRV lookup)
This closes #12
2014-09-26 01:06:29 +02:00
Nathan Adams
18af499c62 Always make sure we have even the optional attributes of PingResponse 2014-09-26 00:02:02 +02:00
Nathan Adams
a8d9c482e7 Retest retries of server.status 2014-09-25 23:56:55 +02:00
Nathan Adams
a6ade86fad API changes to server (closes #14) 2014-09-25 23:53:48 +02:00
Dinner Bone
bfe75cb290 Merge pull request #13 from urda/modernized-readme-fixup
Fixup some README issues
2014-09-17 11:25:57 +02:00
Peter Urda
f07be86a20 Use 'format' instead of % 2014-09-16 13:15:45 -07:00
Peter Urda
e7ac2f24f6 Fix bad query example 2014-09-16 13:14:35 -07:00
Peter Urda
7d3c926a3a Use 'format' instead of % 2014-09-16 13:10:08 -07:00
Peter Urda
d0d7e01850 This status call was wrong, in produced a tuple instead.
Honestly, ping should be wrapped up into the same variable!
2014-09-16 13:07:34 -07:00
Peter Urda
43caf72a59 query.query_server() doesn't work, fixed README 2014-09-16 12:44:12 -07:00
Peter Urda
9d4c54f6e8 Switch "installation" to bash 2014-09-16 12:43:03 -07:00
Peter Urda
947d1802cd Cleanup intro section 2014-09-16 12:42:34 -07:00
Peter Urda
ed792b4642 Set usage to python block 2014-09-16 12:40:31 -07:00
Nathan Adams
4c4239e6f8 Set default timeout to 3 2014-09-11 17:45:35 +02:00
Nathan Adams
1aff32d549 Result of ping.players.sample should be Player objects, not dicts. 2014-09-10 21:37:15 +02:00
Nathan Adams
3c0646367c We should write challenge as an int not uint 2014-09-10 21:34:23 +02:00
Nathan Adams
5c4d7fb35e Default charset is actually ISO-8859-1, not ascii. 2014-09-10 21:34:08 +02:00
Nathan Adams
5f51ee3c1d We'll also need the .protocol package! 2014-09-07 13:40:14 +02:00
Nathan Adams
7b3f9b4dd2 Fixed up setup.py to point to the correct (new) package 2014-09-07 13:35:58 +02:00
Nathan Adams
8c1f3ba50d Updated setup.py with new classifiers 2014-09-07 02:51:20 +02:00
Nathan Adams
1b17c1f702 Updated readme 2014-09-07 02:35:02 +02:00
Nathan Adams
9340ab47c1 Implemented retries on server.ping/query_server 2014-09-07 01:15:42 +02:00
Nathan Adams
08c368be12 Expose query through server.query_server() 2014-09-07 00:59:29 +02:00
Nathan Adams
4e52670370 Removed unused imports 2014-09-07 00:25:04 +02:00
Nathan Adams
1b8ebaf500 Don't get plugins using map() 2014-09-07 00:21:14 +02:00
Nathan Adams
0474c1d68d Parse Query responses into their own object 2014-09-07 00:17:08 +02:00
Nathan Adams
3de920b406 Removed __init__ files from tests. 2014-09-07 00:16:48 +02:00
Nathan Adams
c12836b011 Pack the ID byte when sending Query packets 2014-09-06 21:28:45 +02:00
Nathan Adams
3757cbe876 Fixed order of connection.write decoding 2014-09-06 21:22:35 +02:00
Nathan Adams
c6ba6778f6 Added support for Query 2014-09-06 21:19:24 +02:00
Nathan Adams
e1568b1950 Moved pinger's test file into the correct folder 2014-09-06 18:11:42 +02:00
Nathan Adams
631176be02 Added some more test cases 2014-09-05 17:49:33 +02:00
Nathan Adams
064ca652a4 Support favicon 2014-09-05 13:21:15 +02:00
Nathan Adams
c606609443 Fixing up travis once more. 2014-09-05 12:36:18 +02:00
Nathan Adams
94264f22ea Decode status results into their own object for accessibility & validation 2014-09-05 12:33:18 +02:00
Nathan Adams
cf920b312b Drop support for <=2.6 2014-09-05 00:06:15 +02:00
Nathan Adams
db2d357623 Proper py3 support! 2014-09-05 00:03:46 +02:00
Nathan Adams
e82a44f580 Use bytearray more efficiently, better python 3.1 support 2014-09-04 23:21:52 +02:00
Nathan Adams
060e52118f Include mock for travis 2014-09-04 22:41:22 +02:00
Nathan Adams
f07976fbec Added initial work for server pinging. Normal pings work, Query NYI. 2014-09-04 22:25:21 +02:00
Nathan Adams
34d0154f7a Removed everything; we're starting from scratch. 2014-09-02 17:28:59 +02:00
Nathan Adams
7c36615ea8 Perhaps fixed travis! 2014-09-02 17:26:36 +02:00
Nathan Adams
1efb667c9d Added travis support 2014-09-02 17:24:37 +02:00
22 changed files with 1346 additions and 213 deletions

103
.gitignore vendored
View File

@@ -1,2 +1,105 @@
# Created by http://www.gitignore.io
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
### PyCharm ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm
## Directory-based project format
.idea/
/*.iml
# if you remove the above rule, at least ignore user-specific stuff:
# .idea/workspace.xml
# .idea/tasks.xml
# .idea/dictionaries
# and these sensitive or high-churn files:
# .idea/dataSources.ids
# .idea/dataSources.xml
# .idea/sqlDataSources.xml
# .idea/dynamic.xml
# and, if using gradle::
# .idea/gradle.xml
# .idea/libraries
## File-based project format
*.ipr
*.iws
## Additional for IntelliJ
out/
# generated by mpeltonen/sbt-idea plugin
.idea_modules/
# generated by JIRA plugin
atlassian-ide-plugin.xml
# generated by Crashlytics plugin (for Android Studio and Intellij)
com_crashlytics_export_strings.xml
### SublimeText ###
# workspace files are user-specific
*.sublime-workspace
# project files should be checked into the repository, unless a significant
# proportion of contributors will probably not be using SublimeText
# *.sublime-project
#sftp configuration file
sftp-config.json

8
.travis.yml Normal file
View File

@@ -0,0 +1,8 @@
language: python
python:
- 3.3
- 2.7
install:
- if [[ $TRAVIS_PYTHON_VERSION == 2* ]]; then pip install -r requirements/python2.txt --use-mirrors; fi
- if [[ $TRAVIS_PYTHON_VERSION == 3* ]]; then pip install -r requirements/python3.txt --use-mirrors; fi
script: nosetests

View File

@@ -1,3 +1,10 @@
2.0
* Rewrote all the things!
* Removed CLI. May add it back in future, if there's demand.
* Supports query and status.
* Supports SRV lookups.
* Many tests! Python 3 support! Extra hugs!
1.0
* Initial release.

View File

@@ -1,22 +1,45 @@
MinecraftQuery
======
mcstatus
========
With Minecraft (old-beta-release-)1.9 came a shiny new tool to allow native status querying in vanilla Minecraft servers.
This is a simple class designed to aid in retrieving any data from the servers.
See it in action: [http://dinnerbone.com/minecraft/tools/status/](http://dinnerbone.com/minecraft/tools/status/)
Protocol documention: [http://dinnerbone.com/blog/2011/10/14/minecraft-19-has-rcon-and-query/](http://dinnerbone.com/blog/2011/10/14/minecraft-19-has-rcon-and-query/)
`mcstatus` provides an easy way to query Minecraft servers for any information they can expose.
It provides three modes of access (`query`, `status` and `ping`), the differences of which are listed below in usage.
Usage
-----------
-----
from minecraft_query import MinecraftQuery
query = MinecraftQuery("localhost", 25565)
basic_status = query.get_status()
print "The server has %d players" % (basic_status['numplayers'])
full_info = query.get_rules()
print "The server is on the map '%s'" % (full_info['map'])
```python
from mcstatus import MinecraftServer
# If you know the host and port, you may skip this and use MinecraftServer("example.org", 1234)
server = MinecraftServer.lookup("example.org:1234")
# 'status' is supported by all Minecraft servers that are version 1.7 or higher.
status = server.status()
print("The server has {0} players and replied in {1} ms".format(status.players.online, status.latency))
# 'ping' is supported by all Minecraft servers that are version 1.7 or higher.
# It is included in a 'status' call, but is exposed separate if you do not require the additional info.
latency = server.ping()
print("The server replied in {0} ms".format(latency))
# 'query' has to be enabled in a servers' server.properties file.
# It may give more information than a ping, such as a full player list or mod information.
query = server.query()
print("The server has the following players online: {0}".format(", ".join(query.players.names)))
```
Installation
------------
mcstatus is available on pypi, and can be installed trivially with:
```bash
pip install mcstatus
```
Alternatively, just clone this repo!
License
-------
mcstatus is licensed under Apache 2.0.

44
cli.py
View File

@@ -1,44 +0,0 @@
#!/usr/bin/env python
import socket
import sys
from pprint import pprint
from argparse import ArgumentParser
from minecraft_query import MinecraftQuery
def main():
parser = ArgumentParser(description="Query status of Minecraft multiplayer server",
epilog="Exit status: 0 if the server can be reached, otherwise nonzero."
)
parser.add_argument("host", help="target hostname")
parser.add_argument("-q", "--quiet", action='store_true', default=False,
help='don\'t print anything, just check if the server is running')
parser.add_argument("-p", "--port", type=int, default=25565,
help='UDP port of server\'s "query" service [25565]')
parser.add_argument("-r", "--retries", type=int, default=3,
help='retry query at most this number of times [3]')
parser.add_argument("-t", "--timeout", type=int, default=10,
help='retry timeout in seconds [10]')
options = parser.parse_args()
try:
query = MinecraftQuery(options.host, options.port,
timeout=options.timeout,
retries=options.retries)
server_data = query.get_rules()
except socket.error as e:
if not options.quiet:
print "socket exception caught:", e.message
print "Server is down or unreachable."
sys.exit(1)
if not options.quiet:
print "Server response data:"
pprint(server_data)
sys.exit(0)
if __name__=="__main__":
main()

1
mcstatus/__init__.py Normal file
View File

@@ -0,0 +1 @@
from mcstatus.server import MinecraftServer

146
mcstatus/pinger.py Normal file
View File

@@ -0,0 +1,146 @@
import datetime
import json
import random
from six import string_types
from mcstatus.protocol.connection import Connection
class ServerPinger:
def __init__(self, connection, host="", port=0, version=47, ping_token=None):
if ping_token is None:
ping_token = random.randint(0, (1 << 63) - 1)
self.version = version
self.connection = connection
self.host = host
self.port = port
self.ping_token = ping_token
def handshake(self):
packet = Connection()
packet.write_varint(0)
packet.write_varint(self.version)
packet.write_utf(self.host)
packet.write_short(self.port)
packet.write_varint(1) # Intention to query status
self.connection.write_buffer(packet)
def read_status(self):
request = Connection()
request.write_varint(0) # Request status
self.connection.write_buffer(request)
response = self.connection.read_buffer()
if response.read_varint() != 0:
raise IOError("Received invalid status response packet.")
try:
raw = json.loads(response.read_utf())
except ValueError:
raise IOError("Received invalid JSON")
try:
return PingResponse(raw)
except ValueError as e:
raise IOError("Received invalid status response: %s" % e)
def test_ping(self):
request = Connection()
request.write_varint(1) # Test ping
request.write_long(self.ping_token)
sent = datetime.datetime.now()
self.connection.write_buffer(request)
response = self.connection.read_buffer()
received = datetime.datetime.now()
if response.read_varint() != 1:
raise IOError("Received invalid ping response packet.")
received_token = response.read_long()
if received_token != self.ping_token:
raise IOError("Received mangled ping response packet (expected token %d, received %d)" % (
self.ping_token, received_token))
delta = (received - sent)
# We have no trivial way of getting a time delta :(
return (delta.days * 24 * 60 * 60 + delta.seconds) * 1000 + delta.microseconds / 1000.0
class PingResponse:
class Players:
class Player:
def __init__(self, raw):
if type(raw) is not dict:
raise ValueError("Invalid player object (expected dict, found %s" % type(raw))
if "name" not in raw:
raise ValueError("Invalid player object (no 'name' value)")
if not isinstance(raw["name"], string_types):
raise ValueError("Invalid player object (expected 'name' to be str, was %s)" % type(raw["name"]))
self.name = raw["name"]
if "id" not in raw:
raise ValueError("Invalid player object (no 'id' value)")
if not isinstance(raw["id"], string_types):
raise ValueError("Invalid player object (expected 'id' to be str, was %s)" % type(raw["id"]))
self.id = raw["id"]
def __init__(self, raw):
if type(raw) is not dict:
raise ValueError("Invalid players object (expected dict, found %s" % type(raw))
if "online" not in raw:
raise ValueError("Invalid players object (no 'online' value)")
if type(raw["online"]) is not int:
raise ValueError("Invalid players object (expected 'online' to be int, was %s)" % type(raw["online"]))
self.online = raw["online"]
if "max" not in raw:
raise ValueError("Invalid players object (no 'max' value)")
if type(raw["max"]) is not int:
raise ValueError("Invalid players object (expected 'max' to be int, was %s)" % type(raw["max"]))
self.max = raw["max"]
if "sample" in raw:
if type(raw["sample"]) is not list:
raise ValueError("Invalid players object (expected 'sample' to be list, was %s)" % type(raw["max"]))
self.sample = [PingResponse.Players.Player(p) for p in raw["sample"]]
else:
self.sample = None
class Version:
def __init__(self, raw):
if type(raw) is not dict:
raise ValueError("Invalid version object (expected dict, found %s" % type(raw))
if "name" not in raw:
raise ValueError("Invalid version object (no 'name' value)")
if not isinstance(raw["name"], string_types):
raise ValueError("Invalid version object (expected 'name' to be str, was %s)" % type(raw["name"]))
self.name = raw["name"]
if "protocol" not in raw:
raise ValueError("Invalid version object (no 'protocol' value)")
if type(raw["protocol"]) is not int:
raise ValueError("Invalid version object (expected 'protocol' to be int, was %s)" % type(raw["protocol"]))
self.protocol = raw["protocol"]
def __init__(self, raw):
self.raw = raw
if "players" not in raw:
raise ValueError("Invalid status object (no 'players' value)")
self.players = PingResponse.Players(raw["players"])
if "version" not in raw:
raise ValueError("Invalid status object (no 'version' value)")
self.version = PingResponse.Version(raw["version"])
if "description" not in raw:
raise ValueError("Invalid status object (no 'description' value)")
self.description = raw["description"]
if "favicon" in raw:
self.favicon = raw["favicon"]
else:
self.favicon = None
self.latency = None

View File

View File

@@ -0,0 +1,175 @@
import socket
import struct
class Connection:
def __init__(self):
self.sent = bytearray()
self.received = bytearray()
def read(self, length):
result = self.received[:length]
self.received = self.received[length:]
return result
def write(self, data):
if isinstance(data, Connection):
data = bytearray(data.flush())
if isinstance(data, str):
data = bytearray(data)
self.sent.extend(data)
def receive(self, data):
if not isinstance(data, bytearray):
data = bytearray(data)
self.received.extend(data)
def remaining(self):
return len(self.received)
def flush(self):
result = self.sent
self.sent = ""
return result
def _unpack(self, format, data):
return struct.unpack(">" + format, bytes(data))[0]
def _pack(self, format, data):
return struct.pack(">" + format, data)
def read_varint(self):
result = 0
for i in range(5):
part = ord(self.read(1))
result |= (part & 0x7F) << 7 * i
if not part & 0x80:
return result
raise IOError("Server sent a varint that was too big!")
def write_varint(self, value):
remaining = value
for i in range(5):
if remaining & ~0x7F == 0:
self.write(struct.pack("!B", remaining))
return
self.write(struct.pack("!B", remaining & 0x7F | 0x80))
remaining >>= 7
raise ValueError("The value %d is too big to send in a varint" % value)
def read_utf(self):
length = self.read_varint()
return self.read(length).decode('utf8')
def write_utf(self, value):
self.write_varint(len(value))
self.write(bytearray(value, 'utf8'))
def read_ascii(self):
result = bytearray()
while len(result) == 0 or result[-1] != 0:
result.extend(self.read(1))
return result[:-1].decode("ISO-8859-1")
def write_ascii(self, value):
self.write(bytearray(value, 'ISO-8859-1'))
self.write(bytearray.fromhex("00"))
def read_short(self):
return self._unpack("h", self.read(2))
def write_short(self, value):
self.write(self._pack("h", value))
def read_ushort(self):
return self._unpack("H", self.read(2))
def write_ushort(self, value):
self.write(self._pack("H", value))
def read_int(self):
return self._unpack("i", self.read(4))
def write_int(self, value):
self.write(self._pack("i", value))
def read_uint(self):
return self._unpack("I", self.read(4))
def write_uint(self, value):
self.write(self._pack("I", value))
def read_long(self):
return self._unpack("q", self.read(8))
def write_long(self, value):
self.write(self._pack("q", value))
def read_ulong(self):
return self._unpack("Q", self.read(8))
def write_ulong(self, value):
self.write(self._pack("Q", value))
def read_buffer(self):
length = self.read_varint()
result = Connection()
result.receive(self.read(length))
return result
def write_buffer(self, buffer):
data = buffer.flush()
self.write_varint(len(data))
self.write(data)
class TCPSocketConnection(Connection):
def __init__(self, addr, timeout=3):
Connection.__init__(self)
self.socket = socket.create_connection(addr, timeout=timeout)
def flush(self):
raise TypeError("TCPSocketConnection does not support flush()")
def receive(self, data):
raise TypeError("TCPSocketConnection does not support receive()")
def remaining(self):
raise TypeError("TCPSocketConnection does not support remaining()")
def read(self, length):
result = bytearray()
while len(result) < length:
result.extend(self.socket.recv(length - len(result)))
return result
def write(self, data):
self.socket.send(data)
class UDPSocketConnection(Connection):
def __init__(self, addr, timeout=3):
Connection.__init__(self)
self.addr = addr
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
def flush(self):
raise TypeError("UDPSocketConnection does not support flush()")
def receive(self, data):
raise TypeError("UDPSocketConnection does not support receive()")
def remaining(self):
return 65535
def read(self, length):
result = bytearray()
while len(result) == 0:
result.extend(self.socket.recvfrom(self.remaining())[0])
return result
def write(self, data):
if isinstance(data, Connection):
data = bytearray(data.flush())
self.socket.sendto(data, self.addr)

90
mcstatus/querier.py Normal file
View File

@@ -0,0 +1,90 @@
import struct
from mcstatus.protocol.connection import Connection
class ServerQuerier:
MAGIC_PREFIX = bytearray.fromhex("FEFD")
PACKET_TYPE_CHALLENGE = 9
PACKET_TYPE_QUERY = 0
def __init__(self, connection):
self.connection = connection
self.challenge = 0
def _create_packet(self, id):
packet = Connection()
packet.write(self.MAGIC_PREFIX)
packet.write(struct.pack("!B", id))
packet.write_uint(0)
packet.write_int(self.challenge)
return packet
def _read_packet(self):
packet = Connection()
packet.receive(self.connection.read(self.connection.remaining()))
packet.read(1 + 4)
return packet
def handshake(self):
self.connection.write(self._create_packet(self.PACKET_TYPE_CHALLENGE))
packet = self._read_packet()
self.challenge = int(packet.read_ascii())
def read_query(self):
request = self._create_packet(self.PACKET_TYPE_QUERY)
request.write_uint(0)
self.connection.write(request)
response = self._read_packet()
response.read(len("splitnum") + 1 + 1 + 1)
data = {}
players = []
while True:
key = response.read_ascii()
if len(key) == 0:
response.read(1)
break
value = response.read_ascii()
data[key] = value
response.read(len("player_") + 1 + 1)
while True:
name = response.read_ascii()
if len(name) == 0:
break
players.append(name)
return QueryResponse(data, players)
class QueryResponse:
class Players:
def __init__(self, online, max, names):
self.online = int(online)
self.max = int(max)
self.names = names
class Software:
def __init__(self, version, plugins):
self.version = version
self.brand = "vanilla"
self.plugins = []
if plugins:
parts = plugins.split(":", 1)
self.brand = parts[0].strip()
if len(parts) == 2:
self.plugins = [s.strip() for s in parts[1].split(";")]
def __init__(self, raw, players):
self.raw = raw
self.motd = raw["hostname"]
self.map = raw["map"]
self.players = QueryResponse.Players(raw["numplayers"], raw["maxplayers"], players)
self.software = QueryResponse.Software(raw["version"], raw["plugins"])

77
mcstatus/server.py Normal file
View File

@@ -0,0 +1,77 @@
from mcstatus.pinger import ServerPinger
from mcstatus.protocol.connection import TCPSocketConnection, UDPSocketConnection
from mcstatus.querier import ServerQuerier
import dns.resolver
class MinecraftServer:
def __init__(self, host, port=25565):
self.host = host
self.port = port
@staticmethod
def lookup(address):
host = address
port = None
if ":" in address:
parts = address.split(":")
if len(parts) > 2:
raise ValueError("Invalid address '%s'" % address)
host = parts[0]
port = int(parts[1])
if port is None:
port = 25565
try:
answers = dns.resolver.query("_minecraft._tcp." + host, "SRV")
if len(answers):
answer = answers[0]
host = str(answer.target).rstrip(".")
port = int(answer.port)
except Exception:
pass
return MinecraftServer(host, port)
def ping(self, retries=3, **kwargs):
attempt = 0
connection = TCPSocketConnection((self.host, self.port))
exception = None
while attempt < retries:
try:
pinger = ServerPinger(connection, host=self.host, port=self.port, **kwargs)
pinger.handshake()
return pinger.test_ping()
except Exception as e:
exception = e
attempt += 1
raise exception
def status(self, retries=3, **kwargs):
attempt = 0
connection = TCPSocketConnection((self.host, self.port))
exception = None
while attempt < retries:
try:
pinger = ServerPinger(connection, host=self.host, port=self.port, **kwargs)
pinger.handshake()
result = pinger.read_status()
result.latency = pinger.test_ping()
return result
except Exception as e:
exception = e
attempt += 1
raise exception
def query(self, retries=3):
attempt = 0
exception = None
while attempt < retries:
try:
connection = UDPSocketConnection((self.host, self.port))
querier = ServerQuerier(connection)
querier.handshake()
return querier.read_query()
except Exception as e:
exception = e
attempt += 1
raise exception

View File

@@ -0,0 +1,260 @@
from unittest import TestCase
from mock import Mock, patch
from mcstatus.protocol.connection import Connection, TCPSocketConnection, UDPSocketConnection
class TestConnection(TestCase):
def setUp(self):
self.connection = Connection()
def test_flush(self):
self.connection.sent = bytearray.fromhex("7FAABB")
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FAABB"))
self.assertTrue(self.connection.sent == "")
def test_receive(self):
self.connection.receive(bytearray.fromhex("7F"))
self.connection.receive(bytearray.fromhex("AABB"))
self.assertEqual(self.connection.received, bytearray.fromhex("7FAABB"))
def test_remaining(self):
self.connection.receive(bytearray.fromhex("7F"))
self.connection.receive(bytearray.fromhex("AABB"))
self.assertEqual(self.connection.remaining(), 3)
def test_send(self):
self.connection.write(bytearray.fromhex("7F"))
self.connection.write(bytearray.fromhex("AABB"))
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FAABB"))
def test_read(self):
self.connection.receive(bytearray.fromhex("7FAABB"))
self.assertEqual(self.connection.read(2), bytearray.fromhex("7FAA"))
self.assertEqual(self.connection.read(1), bytearray.fromhex("BB"))
def test_readSimpleVarInt(self):
self.connection.receive(bytearray.fromhex("0F"))
self.assertEqual(self.connection.read_varint(), 15)
def test_writeSimpleVarInt(self):
self.connection.write_varint(15)
self.assertEqual(self.connection.flush(), bytearray.fromhex("0F"))
def test_readBigVarInt(self):
self.connection.receive(bytearray.fromhex("FFFFFFFF7F"))
self.assertEqual(self.connection.read_varint(), 34359738367)
def test_writeBigVarInt(self):
self.connection.write_varint(2147483647)
self.assertEqual(self.connection.flush(), bytearray.fromhex("FFFFFFFF07"))
def test_readInvalidVarInt(self):
self.connection.receive(bytearray.fromhex("FFFFFFFF80"))
self.assertRaises(IOError, self.connection.read_varint)
def test_writeInvalidVarInt(self):
self.assertRaises(ValueError, self.connection.write_varint, 34359738368)
def test_readUtf(self):
self.connection.receive(bytearray.fromhex("0D48656C6C6F2C20776F726C6421"))
self.assertEqual(self.connection.read_utf(), "Hello, world!")
def test_writeUtf(self):
self.connection.write_utf("Hello, world!")
self.assertEqual(self.connection.flush(), bytearray.fromhex("0D48656C6C6F2C20776F726C6421"))
def test_readEmptyUtf(self):
self.connection.write_utf("")
self.assertEqual(self.connection.flush(), bytearray.fromhex("00"))
def test_readAscii(self):
self.connection.receive(bytearray.fromhex("48656C6C6F2C20776F726C642100"))
self.assertEqual(self.connection.read_ascii(), "Hello, world!")
def test_writeAscii(self):
self.connection.write_ascii("Hello, world!")
self.assertEqual(self.connection.flush(), bytearray.fromhex("48656C6C6F2C20776F726C642100"))
def test_readEmptyAscii(self):
self.connection.write_ascii("")
self.assertEqual(self.connection.flush(), bytearray.fromhex("00"))
def test_readShortNegative(self):
self.connection.receive(bytearray.fromhex("8000"))
self.assertEqual(self.connection.read_short(), -32768)
def test_writeShortNegative(self):
self.connection.write_short(-32768)
self.assertEqual(self.connection.flush(), bytearray.fromhex("8000"))
def test_readShortPositive(self):
self.connection.receive(bytearray.fromhex("7FFF"))
self.assertEqual(self.connection.read_short(), 32767)
def test_writeShortPositive(self):
self.connection.write_short(32767)
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FFF"))
def test_readUShortPositive(self):
self.connection.receive(bytearray.fromhex("8000"))
self.assertEqual(self.connection.read_ushort(), 32768)
def test_writeUShortPositive(self):
self.connection.write_ushort(32768)
self.assertEqual(self.connection.flush(), bytearray.fromhex("8000"))
def test_readIntNegative(self):
self.connection.receive(bytearray.fromhex("80000000"))
self.assertEqual(self.connection.read_int(), -2147483648)
def test_writeIntNegative(self):
self.connection.write_int(-2147483648)
self.assertEqual(self.connection.flush(), bytearray.fromhex("80000000"))
def test_readIntPositive(self):
self.connection.receive(bytearray.fromhex("7FFFFFFF"))
self.assertEqual(self.connection.read_int(), 2147483647)
def test_writeIntPositive(self):
self.connection.write_int(2147483647)
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FFFFFFF"))
def test_readUIntPositive(self):
self.connection.receive(bytearray.fromhex("80000000"))
self.assertEqual(self.connection.read_uint(), 2147483648)
def test_writeUIntPositive(self):
self.connection.write_uint(2147483648)
self.assertEqual(self.connection.flush(), bytearray.fromhex("80000000"))
def test_readLongNegative(self):
self.connection.receive(bytearray.fromhex("8000000000000000"))
self.assertEqual(self.connection.read_long(), -9223372036854775808)
def test_writeLongNegative(self):
self.connection.write_long(-9223372036854775808)
self.assertEqual(self.connection.flush(), bytearray.fromhex("8000000000000000"))
def test_readLongPositive(self):
self.connection.receive(bytearray.fromhex("7FFFFFFFFFFFFFFF"))
self.assertEqual(self.connection.read_long(), 9223372036854775807)
def test_writeLongPositive(self):
self.connection.write_long(9223372036854775807)
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FFFFFFFFFFFFFFF"))
def test_readULongPositive(self):
self.connection.receive(bytearray.fromhex("8000000000000000"))
self.assertEqual(self.connection.read_ulong(), 9223372036854775808)
def test_writeULongPositive(self):
self.connection.write_ulong(9223372036854775808)
self.assertEqual(self.connection.flush(), bytearray.fromhex("8000000000000000"))
def test_readBuffer(self):
self.connection.receive(bytearray.fromhex("027FAA"))
buffer = self.connection.read_buffer()
self.assertEqual(buffer.received, bytearray.fromhex("7FAA"))
self.assertEqual(self.connection.flush(), bytearray())
def test_writeBuffer(self):
buffer = Connection()
buffer.write(bytearray.fromhex("7FAA"))
self.connection.write_buffer(buffer)
self.assertEqual(self.connection.flush(), bytearray.fromhex("027FAA"))
class TCPSocketConnectionTest(TestCase):
def setUp(self):
socket = Mock()
socket.recv = Mock()
socket.send = Mock()
with patch("socket.create_connection") as create_connection:
create_connection.return_value = socket
self.connection = TCPSocketConnection(("localhost", 1234))
def test_flush(self):
self.assertRaises(TypeError, self.connection.flush)
def test_receive(self):
self.assertRaises(TypeError, self.connection.receive, "")
def test_remaining(self):
self.assertRaises(TypeError, self.connection.remaining)
def test_read(self):
self.connection.socket.recv.return_value = bytearray.fromhex("7FAA")
self.assertEqual(self.connection.read(2), bytearray.fromhex("7FAA"))
def test_write(self):
self.connection.write(bytearray.fromhex("7FAA"))
self.connection.socket.send.assert_called_once_with(bytearray.fromhex("7FAA"))
class UDPSocketConnectionTest(TestCase):
def setUp(self):
socket = Mock()
socket.recvfrom = Mock()
socket.sendto = Mock()
with patch("socket.socket") as create_socket:
create_socket.return_value = socket
self.connection = UDPSocketConnection(("localhost", 1234))
def test_flush(self):
self.assertRaises(TypeError, self.connection.flush)
def test_receive(self):
self.assertRaises(TypeError, self.connection.receive, "")
def test_remaining(self):
self.assertEqual(self.connection.remaining(), 65535)
def test_read(self):
self.connection.socket.recvfrom.return_value = [bytearray.fromhex("7FAA")]
self.assertEqual(self.connection.read(2), bytearray.fromhex("7FAA"))
def test_write(self):
self.connection.write(bytearray.fromhex("7FAA"))
self.connection.socket.sendto.assert_called_once_with(bytearray.fromhex("7FAA"), ("localhost", 1234))

View File

@@ -0,0 +1,184 @@
from unittest import TestCase
from mcstatus.protocol.connection import Connection
from mcstatus.pinger import ServerPinger, PingResponse
from mcstatus.server import MinecraftServer
class TestServerPinger(TestCase):
def setUp(self):
self.pinger = ServerPinger(Connection(), host="localhost", port=25565, version=44)
def test_handshake(self):
self.pinger.handshake()
self.assertEqual(self.pinger.connection.flush(), bytearray.fromhex("0F002C096C6F63616C686F737463DD01"))
def test_read_status(self):
self.pinger.connection.receive(bytearray.fromhex("7200707B226465736372697074696F6E223A2241204D696E65637261667420536572766572222C22706C6179657273223A7B226D6178223A32302C226F6E6C696E65223A307D2C2276657273696F6E223A7B226E616D65223A22312E382D70726531222C2270726F746F636F6C223A34347D7D"))
status = self.pinger.read_status()
self.assertEqual(status.raw, {"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44}})
self.assertEqual(self.pinger.connection.flush(), bytearray.fromhex("0100"))
def test_read_status_invalid_json(self):
self.pinger.connection.receive(bytearray.fromhex("0300017B"))
self.assertRaises(IOError, self.pinger.read_status)
def test_read_status_invalid_reply(self):
self.pinger.connection.receive(bytearray.fromhex("4F004D7B22706C6179657273223A7B226D6178223A32302C226F6E6C696E65223A307D2C2276657273696F6E223A7B226E616D65223A22312E382D70726531222C2270726F746F636F6C223A34347D7D"))
self.assertRaises(IOError, self.pinger.read_status)
def test_read_status_invalid_status(self):
self.pinger.connection.receive(bytearray.fromhex("0105"))
self.assertRaises(IOError, self.pinger.read_status)
def test_test_ping(self):
self.pinger.connection.receive(bytearray.fromhex("09010000000000DD7D1C"))
self.pinger.ping_token = 14515484
self.assertTrue(self.pinger.test_ping() >= 0)
self.assertEqual(self.pinger.connection.flush(), bytearray.fromhex("09010000000000DD7D1C"))
def test_test_ping_invalid(self):
self.pinger.connection.receive(bytearray.fromhex("011F"))
self.pinger.ping_token = 14515484
self.assertRaises(IOError, self.pinger.test_ping)
def test_test_ping_wrong_token(self):
self.pinger.connection.receive(bytearray.fromhex("09010000000000DD7D1C"))
self.pinger.ping_token = 12345
self.assertRaises(IOError, self.pinger.test_ping)
class TestPingResponse(TestCase):
def test_raw(self):
response = PingResponse({"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44}})
self.assertEqual(response.raw, {"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44}})
def test_description(self):
response = PingResponse({"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44}})
self.assertEqual(response.description, "A Minecraft Server")
def test_description_missing(self):
self.assertRaises(ValueError, PingResponse, {"players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44}})
def test_version(self):
response = PingResponse({"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44}})
self.assertIsNotNone(response.version)
self.assertEqual(response.version.name, "1.8-pre1")
self.assertEqual(response.version.protocol, 44)
def test_version_missing(self):
self.assertRaises(ValueError, PingResponse, {"description":"A Minecraft Server","players":{"max":20,"online":0}})
def test_version_invalid(self):
self.assertRaises(ValueError, PingResponse, {"description":"A Minecraft Server","players":{"max":20,"online":0},"version":"foo"})
def test_players(self):
response = PingResponse({"description":"A Minecraft Server","players":{"max":20,"online":5},"version":{"name":"1.8-pre1","protocol":44}})
self.assertIsNotNone(response.players)
self.assertEqual(response.players.max, 20)
self.assertEqual(response.players.online, 5)
def test_players_missing(self):
self.assertRaises(ValueError, PingResponse, {"description":"A Minecraft Server","version":{"name":"1.8-pre1","protocol":44}})
def test_favicon(self):
response = PingResponse({"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44},"favicon":"data:image/png;base64,foo"})
self.assertEqual(response.favicon, "data:image/png;base64,foo")
def test_favicon_missing(self):
response = PingResponse({"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44}})
self.assertIsNone(response.favicon)
class TestPingResponsePlayers(TestCase):
def test_invalid(self):
self.assertRaises(ValueError, PingResponse.Players, "foo")
def test_max_missing(self):
self.assertRaises(ValueError, PingResponse.Players, {"online":5})
def test_max_invalid(self):
self.assertRaises(ValueError, PingResponse.Players, {"max":"foo","online":5})
def test_online_missing(self):
self.assertRaises(ValueError, PingResponse.Players, {"max":20})
def test_online_invalid(self):
self.assertRaises(ValueError, PingResponse.Players, {"max":20,"online":"foo"})
def test_valid(self):
players = PingResponse.Players({"max":20,"online":5})
self.assertEqual(players.max, 20)
self.assertEqual(players.online, 5)
def test_sample(self):
players = PingResponse.Players({"max":20,"online":1,"sample":[{"name":'Dinnerbone','id':"61699b2e-d327-4a01-9f1e-0ea8c3f06bc6"}]})
self.assertIsNotNone(players.sample)
self.assertEqual(players.sample[0].name, "Dinnerbone")
def test_sample_invalid(self):
self.assertRaises(ValueError, PingResponse.Players, {"max":20,"online":1,"sample":"foo"})
def test_sample_missing(self):
players = PingResponse.Players({"max":20,"online":1})
self.assertIsNone(players.sample)
class TestPingResponsePlayersPlayer(TestCase):
def test_invalid(self):
self.assertRaises(ValueError, PingResponse.Players.Player, "foo")
def test_name_missing(self):
self.assertRaises(ValueError, PingResponse.Players.Player, {"id":"61699b2e-d327-4a01-9f1e-0ea8c3f06bc6"})
def test_name_invalid(self):
self.assertRaises(ValueError, PingResponse.Players.Player, {"name":{},"id":"61699b2e-d327-4a01-9f1e-0ea8c3f06bc6"})
def test_id_missing(self):
self.assertRaises(ValueError, PingResponse.Players.Player, {"name":"Dinnerbone"})
def test_id_invalid(self):
self.assertRaises(ValueError, PingResponse.Players.Player, {"name":"Dinnerbone","id":{}})
def test_valid(self):
player = PingResponse.Players.Player({"name":'Dinnerbone','id':"61699b2e-d327-4a01-9f1e-0ea8c3f06bc6"})
self.assertEqual(player.name, "Dinnerbone")
self.assertEqual(player.id, "61699b2e-d327-4a01-9f1e-0ea8c3f06bc6")
class TestPingResponseVersion(TestCase):
def test_invalid(self):
self.assertRaises(ValueError, PingResponse.Version, "foo")
def test_protocol_missing(self):
self.assertRaises(ValueError, PingResponse.Version, {"name":"foo"})
def test_protocol_invalid(self):
self.assertRaises(ValueError, PingResponse.Version, {"name":"foo","protocol":"bar"})
def test_name_missing(self):
self.assertRaises(ValueError, PingResponse.Version, {"protocol":5})
def test_name_invalid(self):
self.assertRaises(ValueError, PingResponse.Version, {"name":{},"protocol":5})
def test_valid(self):
players = PingResponse.Version({"name":"foo","protocol":5})
self.assertEqual(players.name, "foo")
self.assertEqual(players.protocol, 5)

View File

@@ -0,0 +1,94 @@
from unittest import TestCase
from mcstatus.protocol.connection import Connection
from mcstatus.querier import ServerQuerier, QueryResponse
class TestQuerier(TestCase):
def setUp(self):
self.querier = ServerQuerier(Connection())
def test_handshake(self):
self.querier.connection.receive(bytearray.fromhex("090000000035373033353037373800"))
self.querier.handshake()
self.assertEqual(self.querier.connection.flush(), bytearray.fromhex("FEFD090000000000000000"))
self.assertEqual(self.querier.challenge, 570350778)
def test_query(self):
self.querier.connection.receive(bytearray.fromhex("00000000000000000000000000000000686f73746e616d650041204d696e656372616674205365727665720067616d657479706500534d500067616d655f6964004d494e4543524146540076657273696f6e00312e3800706c7567696e7300006d617000776f726c64006e756d706c61796572730033006d6178706c617965727300323000686f7374706f727400323535363500686f73746970003139322e3136382e35362e31000001706c617965725f000044696e6e6572626f6e6500446a696e6e69626f6e650053746576650000"))
response = self.querier.read_query()
self.assertEqual(self.querier.connection.flush(), bytearray.fromhex("FEFD00000000000000000000000000"))
self.assertEqual(response.raw, {
"hostname": "A Minecraft Server",
"gametype": "SMP",
"game_id": "MINECRAFT",
"version": "1.8",
"plugins": "",
"map": "world",
"numplayers": "3",
"maxplayers": "20",
"hostport": "25565",
"hostip": "192.168.56.1",
})
self.assertEqual(response.players.names, ["Dinnerbone", "Djinnibone", "Steve"])
class TestQueryResponse(TestCase):
def setUp(self):
self.raw = {
"hostname": "A Minecraft Server",
"gametype": "SMP",
"game_id": "MINECRAFT",
"version": "1.8",
"plugins": "",
"map": "world",
"numplayers": "3",
"maxplayers": "20",
"hostport": "25565",
"hostip": "192.168.56.1",
}
self.players = ["Dinnerbone", "Djinnibone", "Steve"]
def test_valid(self):
response = QueryResponse(self.raw, self.players)
self.assertEqual(response.motd, "A Minecraft Server")
self.assertEqual(response.map, "world")
self.assertEqual(response.players.online, 3)
self.assertEqual(response.players.max, 20)
self.assertEqual(response.players.names, ["Dinnerbone", "Djinnibone", "Steve"])
self.assertEqual(response.software.brand, "vanilla")
self.assertEqual(response.software.version, "1.8")
self.assertEqual(response.software.plugins, [])
class TestQueryResponsePlayers(TestCase):
def test_valid(self):
players = QueryResponse.Players(5, 20, ["Dinnerbone", "Djinnibone", "Steve"])
self.assertEqual(players.online, 5)
self.assertEqual(players.max, 20)
self.assertEqual(players.names, ["Dinnerbone", "Djinnibone", "Steve"])
class TestQueryResponseSoftware(TestCase):
def test_vanilla(self):
software = QueryResponse.Software("1.8", "")
self.assertEqual(software.brand, "vanilla")
self.assertEqual(software.version, "1.8")
self.assertEqual(software.plugins, [])
def test_modded(self):
software = QueryResponse.Software("1.8", "A modded server: Foo 1.0; Bar 2.0; Baz 3.0")
self.assertEqual(software.brand, "A modded server")
self.assertEqual(software.plugins, ["Foo 1.0", "Bar 2.0", "Baz 3.0"])
def test_modded_no_plugins(self):
software = QueryResponse.Software("1.8", "A modded server")
self.assertEqual(software.brand, "A modded server")
self.assertEqual(software.plugins, [])

View File

@@ -0,0 +1,122 @@
from unittest import TestCase
from mock import patch, Mock
from mcstatus.protocol.connection import Connection
from mcstatus.server import MinecraftServer
class TestMinecraftServer(TestCase):
def setUp(self):
self.socket = Connection()
self.server = MinecraftServer("localhost", port=25565)
def test_ping(self):
self.socket.receive(bytearray.fromhex("09010000000001C54246"))
with patch("mcstatus.server.TCPSocketConnection") as connection:
connection.return_value = self.socket
latency = self.server.ping(ping_token=29704774, version=47)
self.assertEqual(self.socket.flush(), bytearray.fromhex("0F002F096C6F63616C686F737463DD0109010000000001C54246"))
self.assertEqual(self.socket.remaining(), 0, msg="Data is pending to be read, but should be empty")
self.assertTrue(latency >= 0)
def test_ping_retry(self):
with patch("mcstatus.server.TCPSocketConnection") as connection:
connection.return_value = None
with patch("mcstatus.server.ServerPinger") as pinger:
pinger.side_effect = [Exception, Exception, Exception]
self.assertRaises(Exception, self.server.ping)
self.assertEqual(pinger.call_count, 3)
def test_status(self):
self.socket.receive(bytearray.fromhex("6D006B7B226465736372697074696F6E223A2241204D696E65637261667420536572766572222C22706C6179657273223A7B226D6178223A32302C226F6E6C696E65223A307D2C2276657273696F6E223A7B226E616D65223A22312E38222C2270726F746F636F6C223A34377D7D09010000000001C54246"))
with patch("mcstatus.server.TCPSocketConnection") as connection:
connection.return_value = self.socket
info = self.server.status(ping_token=29704774, version=47)
self.assertEqual(self.socket.flush(), bytearray.fromhex("0F002F096C6F63616C686F737463DD01010009010000000001C54246"))
self.assertEqual(self.socket.remaining(), 0, msg="Data is pending to be read, but should be empty")
self.assertEqual(info.raw, {"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8","protocol":47}})
self.assertTrue(info.latency >= 0)
def test_status_retry(self):
with patch("mcstatus.server.TCPSocketConnection") as connection:
connection.return_value = None
with patch("mcstatus.server.ServerPinger") as pinger:
pinger.side_effect = [Exception, Exception, Exception]
self.assertRaises(Exception, self.server.status)
self.assertEqual(pinger.call_count, 3)
def test_query(self):
self.socket.receive(bytearray.fromhex("090000000035373033353037373800"))
self.socket.receive(bytearray.fromhex("00000000000000000000000000000000686f73746e616d650041204d696e656372616674205365727665720067616d657479706500534d500067616d655f6964004d494e4543524146540076657273696f6e00312e3800706c7567696e7300006d617000776f726c64006e756d706c61796572730033006d6178706c617965727300323000686f7374706f727400323535363500686f73746970003139322e3136382e35362e31000001706c617965725f000044696e6e6572626f6e6500446a696e6e69626f6e650053746576650000"))
self.socket.remaining = Mock()
self.socket.remaining.side_effect = [15, 208]
with patch("mcstatus.server.UDPSocketConnection") as connection:
connection.return_value = self.socket
info = self.server.query()
self.assertEqual(self.socket.flush(), bytearray.fromhex("FEFD090000000000000000FEFD000000000021FEDCBA00000000"))
self.assertEqual(info.raw, {
"hostname": "A Minecraft Server",
"gametype": "SMP",
"game_id": "MINECRAFT",
"version": "1.8",
"plugins": "",
"map": "world",
"numplayers": "3",
"maxplayers": "20",
"hostport": "25565",
"hostip": "192.168.56.1",
})
def test_query_retry(self):
with patch("mcstatus.server.UDPSocketConnection") as connection:
connection.return_value = None
with patch("mcstatus.server.ServerQuerier") as querier:
querier.side_effect = [Exception, Exception, Exception]
self.assertRaises(Exception, self.server.query)
self.assertEqual(querier.call_count, 3)
def test_by_address_no_srv(self):
with patch("dns.resolver.query") as query:
query.return_value = []
self.server = MinecraftServer.lookup("example.org")
query.assert_called_once_with("_minecraft._tcp.example.org", "SRV")
self.assertEqual(self.server.host, "example.org")
self.assertEqual(self.server.port, 25565)
def test_by_address_invalid_srv(self):
with patch("dns.resolver.query") as query:
query.side_effect = [Exception]
self.server = MinecraftServer.lookup("example.org")
query.assert_called_once_with("_minecraft._tcp.example.org", "SRV")
self.assertEqual(self.server.host, "example.org")
self.assertEqual(self.server.port, 25565)
def test_by_address_with_srv(self):
with patch("dns.resolver.query") as query:
answer = Mock()
answer.target = "different.example.org."
answer.port = 12345
query.return_value = [answer]
self.server = MinecraftServer.lookup("example.org")
query.assert_called_once_with("_minecraft._tcp.example.org", "SRV")
self.assertEqual(self.server.host, "different.example.org")
self.assertEqual(self.server.port, 12345)
def test_by_address_with_port(self):
self.server = MinecraftServer.lookup("example.org:12345")
self.assertEqual(self.server.host, "example.org")
self.assertEqual(self.server.port, 12345)
def test_by_address_with_multiple_ports(self):
self.assertRaises(ValueError, MinecraftServer.lookup, "example.org:12345:6789")
def test_by_address_with_invalid_port(self):
self.assertRaises(ValueError, MinecraftServer.lookup, "example.org:port")

View File

@@ -1,2 +0,0 @@
# Backwards-compatibility
from query import *

View File

@@ -1,146 +0,0 @@
import socket
import struct
class MinecraftQuery:
MAGIC_PREFIX = '\xFE\xFD'
PACKET_TYPE_CHALLENGE = 9
PACKET_TYPE_QUERY = 0
HUMAN_READABLE_NAMES = dict(
game_id = "Game Name",
gametype = "Game Type",
motd = "Message of the Day",
hostname = "Server Address",
hostport = "Server Port",
map = "Main World Name",
maxplayers = "Maximum Players",
numplayers = "Players Online",
players = "List of Players",
plugins = "List of Plugins",
raw_plugins = "Raw Plugin Info",
software = "Server Software",
version = "Game Version",
)
def __init__(self, host, port, timeout=10, id=0, retries=2):
self.addr = (host, port)
self.id = id
self.id_packed = struct.pack('>l', id)
self.challenge_packed = struct.pack('>l', 0)
self.retries = 0
self.max_retries = retries
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
def send_raw(self, data):
self.socket.sendto(self.MAGIC_PREFIX + data, self.addr)
def send_packet(self, type, data=''):
self.send_raw(struct.pack('>B', type) + self.id_packed + self.challenge_packed + data)
def read_packet(self):
buff = self.socket.recvfrom(1460)[0]
type = struct.unpack('>B', buff[0])[0]
id = struct.unpack('>l', buff[1:5])[0]
return type, id, buff[5:]
def handshake(self, bypass_retries=False):
self.send_packet(self.PACKET_TYPE_CHALLENGE)
try:
type, id, buff = self.read_packet()
except:
if not bypass_retries:
self.retries += 1
if self.retries < self.max_retries:
self.handshake(bypass_retries=bypass_retries)
return
else:
raise
self.challenge = int(buff[:-1])
self.challenge_packed = struct.pack('>l', self.challenge)
def get_status(self):
if not hasattr(self, 'challenge'):
self.handshake()
self.send_packet(self.PACKET_TYPE_QUERY)
try:
type, id, buff = self.read_packet()
except:
self.handshake()
return self.get_status()
data = {}
data['motd'], data['gametype'], data['map'], data['numplayers'], data['maxplayers'], buff = buff.split('\x00', 5)
data['hostport'] = struct.unpack('<h', buff[:2])[0]
buff = buff[2:]
data['hostname'] = buff[:-1]
for key in ('numplayers', 'maxplayers'):
try:
data[key] = int(data[key])
except:
pass
return data
def get_rules(self):
if not hasattr(self, 'challenge'):
self.handshake()
self.send_packet(self.PACKET_TYPE_QUERY, self.id_packed)
try:
type, id, buff = self.read_packet()
except:
self.retries += 1
if self.retries < self.max_retries:
self.handshake(bypass_retries=True)
return self.get_rules()
else:
raise
data = {}
buff = buff[11:] # splitnum + 2 ints
items, players = buff.split('\x00\x00\x01player_\x00\x00') # Shamefully stole from https://github.com/barneygale/MCQuery
if items[:8] == 'hostname':
items = 'motd' + items[8:]
items = items.split('\x00')
data = dict(zip(items[::2], items[1::2]))
players = players[:-2]
if players:
data['players'] = players.split('\x00')
else:
data['players'] = []
for key in ('numplayers', 'maxplayers', 'hostport'):
try:
data[key] = int(data[key])
except:
pass
data['raw_plugins'] = data['plugins']
data['software'], data['plugins'] = self.parse_plugins(data['raw_plugins'])
return data
def parse_plugins(self, raw):
parts = raw.split(':', 1)
server = parts[0].strip()
plugins = []
if len(parts) == 2:
plugins = parts[1].split(';')
plugins = map(lambda s: s.strip(), plugins)
return server, plugins

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
-r requirements/base.txt

2
requirements/base.txt Normal file
View File

@@ -0,0 +1,2 @@
six==1.7.3
mock==1.0.1

3
requirements/python2.txt Normal file
View File

@@ -0,0 +1,3 @@
-r base.txt
dnspython==1.12.0

3
requirements/python3.txt Normal file
View File

@@ -0,0 +1,3 @@
-r base.txt
dnspython3==1.12.0

View File

@@ -1,11 +1,37 @@
from distutils.core import setup
from six import PY2
install_requires = [
'six'
]
if PY2:
install_requires.append('dnspython')
else:
install_requires.append('dnspython3')
tests_require = [
'mock'
]
setup(
name='mcstatus',
version='1.0',
version='2.0',
author='Nathan Adams',
author_email='dinnerbone@dinnerbone.com',
url='https://pypi.python.org/pypi/mcstatus',
packages=['minecraft_query',],
description='A library to query Minecraft Servers for their status and capabilities.'
packages=['mcstatus', 'mcstatus.protocol'],
description='A library to query Minecraft Servers for their status and capabilities.',
install_requires=install_requires,
tests_require=tests_require,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Operating System :: OS Independent',
'Topic :: Software Development :: Libraries :: Python Modules',
],
)