Source code for pyTCP.async_client
import asyncio
import logging
import time
import async_timeout
from .client_errors import ClientTimeoutError
[docs]class AsyncTcpClient:
"""Asynchronous tcp client
Attributes
----------
host : str
The ip address of the tcp server.
port : int
The port of the tcp server.
reader :obj:
Instance of the StreamReader
writer :obj:
Instance of the StreamWriter
auto_reconnect : bool
If true, a reconnect will be made on connection loss.
logger : :obj:
An instance of the logging module.
buffer : bool, default=True
The split part of the msg which was not returned.
"""
def __init__(self, host: str = "127.0.0.1", port: int = 8080, auto_reconnect: bool = True):
"""The constructor.
Parameters
----------
loop : :obj:
The asyncio event loop.
host : str, default="127.0.0.1"
The ip address of the tcp server.
port : int, default=8080
The port of the tcp server.
auto_reconnect : bool, default=True
If true, a reconnect will be made on connection loss.
"""
self.host = host
self.port = port
self.reader = None
self.writer = None
self._connected = False
self.auto_reconnect = auto_reconnect
self.logger = logging.getLogger(__name__)
self.buffer = []
@property
def is_connected(self):
"""bool: Returns True if connected."""
return self._connected
[docs] async def connect(self, timeout: float = 10.0):
""" Tries to connect to the given host. Waits 0.5 seconds until another try will be made.
Parameters
----------
timeout : float, default 10.0
The maximum time this function will try to connect until a ClientTimeoutError is raised.
Raises
------
ClientTimeoutError
If no connection could be established in the given time a ClientTimeoutError is raised.
"""
timeout_start = time.time()
while not self._connected and time.time() < timeout_start + timeout:
try:
with async_timeout.timeout(timeout):
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
self._connected = True
return
except ConnectionRefusedError:
self.logger.error("error creating a connection, trying again ... ")
await asyncio.sleep(0.5)
continue
raise ClientTimeoutError("timeout while connecting")
[docs] async def send(self, data: bytes):
""" Send a message to the socket. If an socket.error is raised and auto_connect is enabled,
a reconnect will be executed.
Parameters
----------
data : bytes
Sends the given bytes to the socket.
"""
if not self._connected:
return
try:
self.writer.write(data)
except ConnectionRefusedError:
self._connected = False
self.logger.error("error send data")
if self.auto_reconnect:
self.logger.error("reconnecting ...")
await self.connect()
[docs] async def receive(self, bytes_to_receive: int = 4096) -> bytes:
""" Receives messages from the socket. If an socket.error is raised and auto_connect is enabled,
a reconnect will be executed, otherwise an empty byte string will be returned.
Parameters
----------
bytes_to_receive : int, default 4096
Reads the number bytes from the socket. Returns fewer bytes than bytes_to_receive if fewer are available.
Returns
-------
bytes
The received data from the socket. Or an empty byte string if socket.error is raised.
"""
if not self._connected:
return b''
try:
data = await self.reader.read(bytes_to_receive)
return data
except ConnectionRefusedError:
self._connected = False
self.logger.error("error receiving data")
if self.auto_reconnect:
self.logger.error("reconnecting ...")
await self.connect()
return b''
[docs] async def receive_until(self, bytes_to_receive: int = 4096, delimiter: bytes = '\n', timeout: float = 1.0) -> bytes:
""" Receives messages from the socket until the given delimiter is recognized.
The data will be split at the delimiter. The delimiter will be removed from the message and returned.
If the received message contains a message after the delimiter, it will be stored in a buffer
and prepended to the next message.
If an socket.error is raised and auto_connect is enabled,
a reconnect will be executed, otherwise an empty byte string will be returned.
Parameters
----------
bytes_to_receive : int, default 4096
Reads the number bytes from the socket. Returns fewer bytes than bytes_to_receive if fewer are available.
delimiter : bytes, default '\\n'
Splits the read data at the delimiter
timeout : float, default 1.0
The maximum time this function will wait until a ClientTimeoutError is raised.
Returns
-------
bytes
The received data from the socket. Or an empty byte string if socket.error is raised.
Raises
------
ClientTimeoutError
Raises if no data was read or no delimiter was found withing the given time.
"""
timeout_start = time.time()
while time.time() < timeout_start + timeout:
chunk = await self.receive(bytes_to_receive)
if not chunk:
break
if delimiter not in chunk:
self.buffer.append(chunk)
continue
data_list = chunk.split(delimiter)
self.buffer.append(data_list[0])
ret = self.buffer.copy()
self.buffer = [data_list[1]]
return b''.join(ret)
raise ClientTimeoutError("timeout while receiving data")
[docs] def close(self):
""" Closes the socket connection if it open
"""
if not self._connected:
return
self._connected = False
self.writer.close()