Use bytearray more efficiently, better python 3.1 support

This commit is contained in:
Nathan Adams
2014-09-04 23:21:52 +02:00
parent 060e52118f
commit e82a44f580
5 changed files with 63 additions and 57 deletions

View File

@@ -15,10 +15,14 @@ class Connection:
return result
def write(self, data):
self.sent += data
if not isinstance(data, bytearray):
data = bytearray(data)
self.sent.extend(data)
def receive(self, data):
self.received += data
if not isinstance(data, bytearray):
data = bytearray(data)
self.received.extend(data)
def remaining(self):
return len(self.received)

View File

@@ -1,5 +1,5 @@
import json
import socket
from mcstatus.pinger import ServerPinger
from mcstatus.protocol.connection import TCPSocketConnection

View File

@@ -1,5 +1,5 @@
import socket
from unittest import TestCase
from mock import Mock, patch
from mcstatus.protocol.connection import Connection, TCPSocketConnection
@@ -10,57 +10,57 @@ class TestConnection(TestCase):
self.connection = Connection()
def test_flush(self):
self.connection.sent = "\x7F\xAA\xBB"
self.connection.sent = bytearray.fromhex("7FAABB")
self.assertEqual(self.connection.flush(), "\x7F\xAA\xBB")
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FAABB"))
self.assertTrue(self.connection.sent == "")
def test_receive(self):
self.connection.receive("\x7F")
self.connection.receive("\xAA\xBB")
self.connection.receive(bytearray.fromhex("7F"))
self.connection.receive(bytearray.fromhex("AABB"))
self.assertEqual(self.connection.received, "\x7F\xAA\xBB")
self.assertEqual(self.connection.received, bytearray.fromhex("7FAABB"))
def test_remaining(self):
self.connection.receive("\x7F")
self.connection.receive("\xAA\xBB")
self.connection.receive(bytearray.fromhex("7F"))
self.connection.receive(bytearray.fromhex("AABB"))
self.assertEqual(self.connection.remaining(), 3)
def test_send(self):
self.connection.write("\x7F")
self.connection.write("\xAA\xBB")
self.connection.write(bytearray.fromhex("7F"))
self.connection.write(bytearray.fromhex("AABB"))
self.assertEqual(self.connection.flush(), "\x7F\xAA\xBB")
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FAABB"))
def test_read(self):
self.connection.receive("\x7F\xAA\xBB")
self.connection.receive(bytearray.fromhex("7FAABB"))
self.assertEqual(self.connection.read(2), "\x7F\xAA")
self.assertEqual(self.connection.read(1), "\xBB")
self.assertEqual(self.connection.read(2), bytearray.fromhex("7FAA"))
self.assertEqual(self.connection.read(1), bytearray.fromhex("BB"))
def test_readSimpleVarInt(self):
self.connection.receive("\x0F")
self.connection.receive(bytearray.fromhex("0F"))
self.assertEqual(self.connection.read_varint(), 15)
def test_writeSimpleVarInt(self):
self.connection.write_varint(15)
self.assertEqual(self.connection.flush(), "\x0F")
self.assertEqual(self.connection.flush(), bytearray.fromhex("0F"))
def test_readBigVarInt(self):
self.connection.receive("\xFF\xFF\xFF\xFF\x7F")
self.connection.receive(bytearray.fromhex("FFFFFFFF7F"))
self.assertEqual(self.connection.read_varint(), 34359738367)
def test_writeBigVarInt(self):
self.connection.write_varint(2147483647)
self.assertEqual(self.connection.flush(), "\xFF\xFF\xFF\xFF\x07")
self.assertEqual(self.connection.flush(), bytearray.fromhex("FFFFFFFF07"))
def test_readInvalidVarInt(self):
self.connection.receive("\xFF\xFF\xFF\xFF\x80")
self.connection.receive(bytearray.fromhex("FFFFFFFF80"))
self.assertRaises(IOError, self.connection.read_varint)
@@ -68,93 +68,93 @@ class TestConnection(TestCase):
self.assertRaises(ValueError, self.connection.write_varint, 34359738368)
def test_readString(self):
self.connection.receive("\x0D\x48\x65\x6C\x6C\x6F\x2C\x20\x77\x6F\x72\x6C\x64\x21")
self.connection.receive(bytearray.fromhex("0D48656C6C6F2C20776F726C6421"))
self.assertEqual(self.connection.read_utf(), "Hello, world!")
def test_writeString(self):
self.connection.write_utf("Hello, world!")
self.assertEqual(self.connection.flush(), "\x0D\x48\x65\x6C\x6C\x6F\x2C\x20\x77\x6F\x72\x6C\x64\x21")
self.assertEqual(self.connection.flush(), bytearray.fromhex("0D48656C6C6F2C20776F726C6421"))
def test_readEmptyString(self):
self.connection.write_utf("")
self.assertEqual(self.connection.flush(), "\x00")
self.assertEqual(self.connection.flush(), bytearray.fromhex("00"))
def test_readShortNegative(self):
self.connection.receive("\x80\x00")
self.connection.receive(bytearray.fromhex("8000"))
self.assertEqual(self.connection.read_short(), -32768)
def test_writeShortNegative(self):
self.connection.write_short(-32768)
self.assertEqual(self.connection.flush(), "\x80\x00")
self.assertEqual(self.connection.flush(), bytearray.fromhex("8000"))
def test_readShortPositive(self):
self.connection.receive("\x7F\xFF")
self.connection.receive(bytearray.fromhex("7FFF"))
self.assertEqual(self.connection.read_short(), 32767)
def test_writeShortPositive(self):
self.connection.write_short(32767)
self.assertEqual(self.connection.flush(), "\x7F\xFF")
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FFF"))
def test_readUShortPositive(self):
self.connection.receive("\x80\x00")
self.connection.receive(bytearray.fromhex("8000"))
self.assertEqual(self.connection.read_ushort(), 32768)
def test_writeUShortPositive(self):
self.connection.write_ushort(32768)
self.assertEqual(self.connection.flush(), "\x80\x00")
self.assertEqual(self.connection.flush(), bytearray.fromhex("8000"))
def test_readLongNegative(self):
self.connection.receive("\x80\x00\x00\x00\x00\x00\x00\x00")
self.connection.receive(bytearray.fromhex("8000000000000000"))
self.assertEqual(self.connection.read_long(), -9223372036854775808)
def test_writeLongNegative(self):
self.connection.write_long(-9223372036854775808)
self.assertEqual(self.connection.flush(), "\x80\x00\x00\x00\x00\x00\x00\x00")
self.assertEqual(self.connection.flush(), bytearray.fromhex("8000000000000000"))
def test_readLongPositive(self):
self.connection.receive("\x7F\xFF\xFF\xFF\xFF\xFF\xFF\xFF")
self.connection.receive(bytearray.fromhex("7FFFFFFFFFFFFFFF"))
self.assertEqual(self.connection.read_long(), 9223372036854775807)
def test_writeLongPositive(self):
self.connection.write_long(9223372036854775807)
self.assertEqual(self.connection.flush(), "\x7F\xFF\xFF\xFF\xFF\xFF\xFF\xFF")
self.assertEqual(self.connection.flush(), bytearray.fromhex("7FFFFFFFFFFFFFFF"))
def test_readULongPositive(self):
self.connection.receive("\x80\x00\x00\x00\x00\x00\x00\x00")
self.connection.receive(bytearray.fromhex("8000000000000000"))
self.assertEqual(self.connection.read_ulong(), 9223372036854775808)
def test_writeULongPositive(self):
self.connection.write_ulong(9223372036854775808)
self.assertEqual(self.connection.flush(), "\x80\x00\x00\x00\x00\x00\x00\x00")
self.assertEqual(self.connection.flush(), bytearray.fromhex("8000000000000000"))
def test_readBuffer(self):
self.connection.receive("\x02\x7F\xAA")
self.connection.receive(bytearray.fromhex("027FAA"))
buffer = self.connection.read_buffer()
self.assertEqual(buffer.received, "\x7F\xAA")
self.assertEqual(buffer.received, bytearray.fromhex("7FAA"))
self.assertEqual(self.connection.flush(), "")
def test_writeBuffer(self):
buffer = Connection()
buffer.write("\x7F\xAA")
buffer.write(bytearray.fromhex("7FAA"))
self.connection.write_buffer(buffer)
self.assertEqual(self.connection.flush(), "\x02\x7F\xAA")
self.assertEqual(self.connection.flush(), bytearray.fromhex("027FAA"))
class TCPSocketConnectionTest(TestCase):
def setUp(self):
@@ -175,11 +175,11 @@ class TCPSocketConnectionTest(TestCase):
self.assertRaises(TypeError, self.connection.remaining)
def test_read(self):
self.connection.socket.recv.return_value = "\x7F\xAA"
self.connection.socket.recv.return_value = bytearray.fromhex("7FAA")
self.assertEqual(self.connection.read(2), "\x7F\xAA")
self.assertEqual(self.connection.read(2), bytearray.fromhex("7FAA"))
def test_write(self):
self.connection.write("\x7F\xAA")
self.connection.write(bytearray.fromhex("7FAA"))
self.connection.socket.send.assert_called_once_with("\x7F\xAA")
self.connection.socket.send.assert_called_once_with(bytearray.fromhex("7FAA"))

View File

@@ -11,34 +11,34 @@ class TestServerPinger(TestCase):
def test_handshake(self):
self.pinger.handshake()
self.assertEqual(self.pinger.connection.flush(), "\x0F\x00\x2C\x09\x6C\x6F\x63\x61\x6C\x68\x6F\x73\x74\x63\xDD\x01")
self.assertEqual(self.pinger.connection.flush(), bytearray.fromhex("0F002C096C6F63616C686F737463DD01"))
def test_read_status(self):
self.pinger.connection.receive("\x72\x00\x70\x7B\x22\x64\x65\x73\x63\x72\x69\x70\x74\x69\x6F\x6E\x22\x3A\x22\x41\x20\x4D\x69\x6E\x65\x63\x72\x61\x66\x74\x20\x53\x65\x72\x76\x65\x72\x22\x2C\x22\x70\x6C\x61\x79\x65\x72\x73\x22\x3A\x7B\x22\x6D\x61\x78\x22\x3A\x32\x30\x2C\x22\x6F\x6E\x6C\x69\x6E\x65\x22\x3A\x30\x7D\x2C\x22\x76\x65\x72\x73\x69\x6F\x6E\x22\x3A\x7B\x22\x6E\x61\x6D\x65\x22\x3A\x22\x31\x2E\x38\x2D\x70\x72\x65\x31\x22\x2C\x22\x70\x72\x6F\x74\x6F\x63\x6F\x6C\x22\x3A\x34\x34\x7D\x7D")
self.pinger.connection.receive(bytearray.fromhex("7200707B226465736372697074696F6E223A2241204D696E65637261667420536572766572222C22706C6179657273223A7B226D6178223A32302C226F6E6C696E65223A307D2C2276657273696F6E223A7B226E616D65223A22312E382D70726531222C2270726F746F636F6C223A34347D7D"))
self.assertEqual(self.pinger.read_status(), '{"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8-pre1","protocol":44}}')
self.assertEqual(self.pinger.connection.flush(), "\x01\x00")
self.assertEqual(self.pinger.connection.flush(), bytearray.fromhex("0100"))
def test_read_status_invalid(self):
self.pinger.connection.receive("\x01\x05")
self.pinger.connection.receive(bytearray.fromhex("0105"))
self.assertRaises(IOError, self.pinger.read_status)
def test_test_ping(self):
self.pinger.connection.receive("\x09\x01\x00\x00\x00\x00\x00\xDD\x7D\x1C")
self.pinger.connection.receive(bytearray.fromhex("09010000000000DD7D1C"))
self.pinger.ping_token = 14515484
self.assertTrue(self.pinger.test_ping() >= 0)
self.assertEqual(self.pinger.connection.flush(), "\x09\x01\x00\x00\x00\x00\x00\xDD\x7D\x1C")
self.assertEqual(self.pinger.connection.flush(), bytearray.fromhex("09010000000000DD7D1C"))
def test_test_ping_invalid(self):
self.pinger.connection.receive("\x01\x1F")
self.pinger.connection.receive(bytearray.fromhex("011F"))
self.pinger.ping_token = 14515484
self.assertRaises(IOError, self.pinger.test_ping)
def test_test_ping_wrong_token(self):
self.pinger.connection.receive("\x09\x01\x00\x00\x00\x00\x00\xDD\x7D\x1C")
self.pinger.connection.receive(bytearray.fromhex("09010000000000DD7D1C"))
self.pinger.ping_token = 12345
self.assertRaises(IOError, self.pinger.test_ping)

View File

@@ -1,5 +1,7 @@
from unittest import TestCase
from mock import Mock, patch
from mock import patch
from mcstatus.protocol.connection import Connection
from mcstatus.server import MinecraftServer
@@ -10,19 +12,19 @@ class TestMinecraftServer(TestCase):
self.server = MinecraftServer("localhost", port=25565)
def test_ping_server(self):
self.socket.receive("\x6D\x00\x6B\x7B\x22\x64\x65\x73\x63\x72\x69\x70\x74\x69\x6F\x6E\x22\x3A\x22\x41\x20\x4D\x69\x6E\x65\x63\x72\x61\x66\x74\x20\x53\x65\x72\x76\x65\x72\x22\x2C\x22\x70\x6C\x61\x79\x65\x72\x73\x22\x3A\x7B\x22\x6D\x61\x78\x22\x3A\x32\x30\x2C\x22\x6F\x6E\x6C\x69\x6E\x65\x22\x3A\x30\x7D\x2C\x22\x76\x65\x72\x73\x69\x6F\x6E\x22\x3A\x7B\x22\x6E\x61\x6D\x65\x22\x3A\x22\x31\x2E\x38\x22\x2C\x22\x70\x72\x6F\x74\x6F\x63\x6F\x6C\x22\x3A\x34\x37\x7D\x7D\x09\x01\x00\x00\x00\x00\x01\xC5\x42\x46")
self.socket.receive(bytearray.fromhex("6D006B7B226465736372697074696F6E223A2241204D696E65637261667420536572766572222C22706C6179657273223A7B226D6178223A32302C226F6E6C696E65223A307D2C2276657273696F6E223A7B226E616D65223A22312E38222C2270726F746F636F6C223A34377D7D09010000000001C54246"))
with patch("mcstatus.server.TCPSocketConnection") as connection:
connection.return_value = self.socket
info = self.server.ping_server(ping_token=29704774, version=47)
self.assertEqual(self.socket.flush(), "\x0F\x00\x2F\x09\x6C\x6F\x63\x61\x6C\x68\x6F\x73\x74\x63\xDD\x01\x01\x00\x09\x01\x00\x00\x00\x00\x01\xC5\x42\x46")
self.assertEqual(self.socket.flush(), bytearray.fromhex("0F002F096C6F63616C686F737463DD01010009010000000001C54246"))
self.assertEqual(self.socket.remaining(), 0, msg="Data is pending to be read, but should be empty")
self.assertEqual(info["status"], {"description":"A Minecraft Server","players":{"max":20,"online":0},"version":{"name":"1.8","protocol":47}})
self.assertTrue(info["latency"] >= 0)
def test_ping_server_invalid_json(self):
self.socket.receive("\x6D\x00\x6B\x7C\x22\x64\x65\x73\x63\x72\x69\x70\x74\x69\x6F\x6E\x22\x3A\x22\x41\x20\x4D\x69\x6E\x65\x63\x72\x61\x66\x74\x20\x53\x65\x72\x76\x65\x72\x22\x2C\x22\x70\x6C\x61\x79\x65\x72\x73\x22\x3A\x7B\x22\x6D\x61\x78\x22\x3A\x32\x30\x2C\x22\x6F\x6E\x6C\x69\x6E\x65\x22\x3A\x30\x7D\x2C\x22\x76\x65\x72\x73\x69\x6F\x6E\x22\x3A\x7B\x22\x6E\x61\x6D\x65\x22\x3A\x22\x31\x2E\x38\x22\x2C\x22\x70\x72\x6F\x74\x6F\x63\x6F\x6C\x22\x3A\x34\x37\x7D\x7D\x09\x01\x00\x00\x00\x00\x01\xC5\x42\x46")
self.socket.receive(bytearray.fromhex("6D006B7C226465736372697074696F6E223A2241204D696E65637261667420536572766572222C22706C6179657273223A7B226D6178223A32302C226F6E6C696E65223A307D2C2276657273696F6E223A7B226E616D65223A22312E38222C2270726F746F636F6C223A34377D7D09010000000001C54246"))
with patch("mcstatus.server.TCPSocketConnection") as connection:
connection.return_value = self.socket