Skip to content
Snippets Groups Projects

Add Memory Stop Listener

Merged Rainer Kartmann requested to merge memory-stop-listener into master
12 files
+ 388
14
Compare changes
  • Side-by-side
  • Inline
Files
12
+ 37
0
import abc
import typing as ty
Client = ty.TypeVar("Client")
class ReconnectingProxyClient(abc.ABC):
def __init__(
self,
):
self._client: ty.Optional[Client] = None
def _get(
self,
wait=True,
log=None,
) -> ty.Optional[Client]:
self._connect(wait=wait, log=log)
return self._client
def _connect(self, wait=True, log=None):
if self._client is None:
if wait:
self.client = self._wait_for_client()
else:
self.client = self._get_client()
@abc.abstractmethod
def _wait_for_client(self) -> Client:
...
@abc.abstractmethod
def _get_client(self) -> ty.Optional[Client]:
...
Loading