Fix compatibility with python3.7 and a few linter things

This commit is contained in:
Kevin Tindall
2022-01-25 18:24:10 -06:00
parent 54468aae26
commit 6be73e995b
3 changed files with 17 additions and 11 deletions

View File

@@ -1,5 +1,13 @@
from abc import abstractmethod, ABC from abc import abstractmethod, ABC
from typing import SupportsBytes, Iterable, SupportsIndex, Union
try:
from typing import SupportsBytes, Iterable, SupportsIndex, Tuple, Union
BytesConvertable = Union[SupportsIndex, Iterable[SupportsIndex]]
except ImportError:
from typing import SupportsBytes, Iterable, Tuple, Union
BytesConvertable = Union[int, Iterable[int]]
import socket import socket
import struct import struct
import asyncio import asyncio
@@ -10,8 +18,6 @@ from ctypes import c_int32 as signed_int32
from ..scripts.address_tools import ip_type from ..scripts.address_tools import ip_type
BytesConvertable = Union[SupportsIndex, Iterable[SupportsIndex]]
class Connection: class Connection:
def __init__(self): def __init__(self):
@@ -108,7 +114,7 @@ class Connection:
def read_uint(self) -> int: def read_uint(self) -> int:
return self._unpack("I", self.read(4)) return self._unpack("I", self.read(4))
def write_uint(self, value: int) -> None: def write_uint(self, value: int) -> None:
self.write(self._pack("I", value)) self.write(self._pack("I", value))
def read_long(self) -> int: def read_long(self) -> int:
@@ -185,7 +191,7 @@ class AsyncReadConnection(Connection, ABC):
class TCPSocketConnection(Connection): class TCPSocketConnection(Connection):
def __init__(self, addr: tuple[str, int], timeout: float = 3): def __init__(self, addr: Tuple[str, int], timeout: float = 3):
Connection.__init__(self) Connection.__init__(self)
self.socket = socket.create_connection(addr, timeout=timeout) self.socket = socket.create_connection(addr, timeout=timeout)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
@@ -219,7 +225,7 @@ class TCPSocketConnection(Connection):
class UDPSocketConnection(Connection): class UDPSocketConnection(Connection):
def __init__(self, addr: tuple[str, int], timeout: float = 3): def __init__(self, addr: Tuple[str, int], timeout: float = 3):
Connection.__init__(self) Connection.__init__(self)
self.addr = addr self.addr = addr
self.socket = socket.socket( self.socket = socket.socket(
@@ -264,7 +270,7 @@ class TCPAsyncSocketConnection(AsyncReadConnection):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
async def connect(self, addr: tuple[str, int], timeout: float = 3): async def connect(self, addr: Tuple[str, int], timeout: float = 3):
self.timeout = timeout self.timeout = timeout
conn = asyncio.open_connection(addr[0], addr[1]) conn = asyncio.open_connection(addr[0], addr[1])
self.reader, self.writer = await asyncio.wait_for(conn, timeout=self.timeout) self.reader, self.writer = await asyncio.wait_for(conn, timeout=self.timeout)
@@ -296,7 +302,7 @@ class UDPAsyncSocketConnection(AsyncReadConnection):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
async def connect(self, addr: tuple[str, int], timeout: float = 3): async def connect(self, addr: Tuple[str, int], timeout: float = 3):
self.timeout = timeout self.timeout = timeout
conn = asyncio_dgram.connect((addr[0], addr[1])) conn = asyncio_dgram.connect((addr[0], addr[1]))
self.stream = await asyncio.wait_for(conn, timeout=self.timeout) self.stream = await asyncio.wait_for(conn, timeout=self.timeout)

View File

@@ -276,5 +276,5 @@ class UDPSocketConnectionTest:
self.connection.socket.sendto.assert_called_once_with( # type: ignore[attr-defined] self.connection.socket.sendto.assert_called_once_with( # type: ignore[attr-defined]
bytearray.fromhex("7FAA"), bytearray.fromhex("7FAA"),
("localhost", 1234) ("localhost", 1234),
) )

View File

@@ -11,7 +11,7 @@ def test_is_completely_asynchronous():
assertions = 0 assertions = 0
for attribute in dir(conn): for attribute in dir(conn):
if attribute.startswith("read_"): if attribute.startswith("read_"):
assert iscoroutinefunction(conn.__getattribute__(attribute)) assert iscoroutinefunction(getattr(conn, attribute))
assertions += 1 assertions += 1
assert assertions > 0, "None of the read_* attributes were async" assert assertions > 0, "None of the read_* attributes were async"
@@ -21,6 +21,6 @@ def test_query_is_completely_asynchronous():
assertions = 0 assertions = 0
for attribute in dir(conn): for attribute in dir(conn):
if attribute.startswith("read_"): if attribute.startswith("read_"):
assert iscoroutinefunction(conn.__getattribute__(attribute)) assert iscoroutinefunction(getattr(conn, attribute))
assertions += 1 assertions += 1
assert assertions > 0, "None of the read_* attributes were async" assert assertions > 0, "None of the read_* attributes were async"