How Do I Add A Timeout To Multiprocessing.connection.Client(..) In Python 3.7?
I've got two Python programs running. Program A connects to program B with the multiprocessing module:
# Connection code in program A # ----------------------------- import multiprocessing import multiprocessing.connection ... connection = multiprocessing.connection.Client( ('localhost', 19191), # <- address of program B authkey='embeetle'.encode('utf-8') # <- authorization key ) ... connection.send(send_data) recv_data = connection.recv()
It works perfectly most of the time. However, sometimes program B is frozen (the details don't matter much, but it usually happens when the GUI from program B spawns a modal window).
While program B is frozen, program A hangs at the following line:
connection = multiprocessing.connection.Client( ('localhost', 19191), # <- address of program B authkey='embeetle'.encode('utf-8') # <- authorization key )
It keeps waiting for a response. I would like to put a timeout parameter, but the call to
multiprocessing.connection.Client(..) does not have one.
How can I implement a timeout here?
I'm working on a
Windows 10 computer with
I would like to put a timeout parameter, but the call to
multiprocessing.connection.Client(..)does not have one. How can I implement a timeout here?
Looking at the source to multiprocessing.connection in Python 3.7, the
Client() function is a fairly brief wrapper around
SocketClient() for your use case, which in turn wraps
At first it looked fairly straightforward to write a
ClientWithTimeout wrapper that does the same thing, but additionally calls
settimeout() on the socket it creates for the connection. However, this does not have the correct effect, because:
Python implements its own socket timeout behaviour by using
select()and an underlying non-blocking OS socket; this behaviour is what is configured by
Connectionoperates directly on an OS socket handle, which is returned by calling
detach()on the normal Python socket object.
Since Python has set the OS socket handle to the non-blocking mode,
recv()calls on it return immediately rather than waiting for the timeout period.
However, we can still set a receive timeout on the underlying OS socket handle by using the low-level
SO_RCVTIMEO socket option.
Hence the second version of my solution:
from multiprocessing.connection import Connection, answer_challenge, deliver_challenge import socket, struct def ClientWithTimeout(address, authkey, timeout): with socket.socket(socket.AF_INET) as s: s.setblocking(True) s.connect(address) # We'd like to call s.settimeout(timeout) here, but that won't work. # Instead, prepare a C "struct timeval" to specify timeout. Note that # these field sizes may differ by platform. seconds = int(timeout) microseconds = int((timeout - seconds) * 1e6) timeval = struct.pack("@LL", seconds, microseconds) # And then set the SO_RCVTIMEO (receive timeout) option with this. s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval) # Now create the connection as normal. c = Connection(s.detach()) # The following code will now fail if a socket timeout occurs. answer_challenge(c, authkey) deliver_challenge(c, authkey) return c
For brevity, I have assumed the parameters are as per your example, i.e.:
- address is a tuple (implying address family is
- authkey is a byte string.
If you need to handle cases where these assumptions don't hold then you will need to copy a little more logic from
Although I looked at the
multiprocessing.connection source to find out how to do this, my solution does not use any private implementation details.
deliver_challenge are all public and documented parts of the API. This function should therefore be be safe to use with future versions of
SO_RCVTIMEO may not be supported on all platforms, but it is present on at least Windows, Linux and OSX. The format of
struct timeval is also platform-specific. I have assumed that the two fields are always of the native
unsigned long type. I think this should be correct on common platforms but it is not guaranteed to always be so. Unfortunately Python does not currently provide a platform-independent way to do this.
Below is a test program which shows this working - it assumes the above code is saved as
from multiprocessing.connection import Client, Listener from client_timeout import ClientWithTimeout from threading import Thread from time import time, sleep addr = ('localhost', 19191) key = 'embeetle'.encode('utf-8') # Provide a listener which either does or doesn't accept connections. class ListenerThread(Thread): def __init__(self, accept): Thread.__init__(self) self.accept = accept def __enter__(self): if self.accept: print("Starting listener, accepting connections") else: print("Starting listener, not accepting connections") self.active = True self.start() sleep(0.1) def run(self): listener = Listener(addr, authkey=key) self.active = True if self.accept: listener.accept() while self.active: sleep(0.1) listener.close() def __exit__(self, exc_type, exc_val, exc_tb): self.active = False self.join() print("Stopped listener") return True for description, accept, name, function in [ ("ClientWithTimeout succeeds when the listener accepts connections.", True, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)), ("ClientWithTimeout fails after 3s when listener doesn't accept connections.", False, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)), ("Client succeeds when the listener accepts connections.", True, "Client", lambda: Client(addr, authkey=key)), ("Client hangs when the listener doesn't accept connections (use ctrl-C to stop).", False, "Client", lambda: Client(addr, authkey=key))]: print("Expected result:", description) with ListenerThread(accept): start_time = time() try: print("Creating connection using %s... " % name) client = function() print("Client created:", client) except Exception as e: print("Failed:", e) print("Time elapsed: %f seconds" % (time() - start_time)) print()
Running this on Linux produces the following output:
Expected result: ClientWithTimeout succeeds when the listener accepts connections. Starting listener, accepting connections Creating connection using ClientWithTimeout... Client created: <multiprocessing.connection.Connection object at 0x7fad536884e0> Time elapsed: 0.003276 seconds Stopped listener Expected result: ClientWithTimeout fails after 3s when listener doesn't accept connections. Starting listener, not accepting connections Creating connection using ClientWithTimeout... Failed: [Errno 11] Resource temporarily unavailable Time elapsed: 3.157268 seconds Stopped listener Expected result: Client succeeds when the listener accepts connections. Starting listener, accepting connections Creating connection using Client... Client created: <multiprocessing.connection.Connection object at 0x7fad53688c50> Time elapsed: 0.001957 seconds Stopped listener Expected result: Client hangs when the listener doesn't accept connections (use ctrl-C to stop). Starting listener, not accepting connections Creating connection using Client... ^C Stopped listener
- → What are the pluses/minuses of different ways to configure GPIOs on the Beaglebone Black?
- → Django, code inside <script> tag doesn't work in a template
- → React - Django webpack config with dynamic 'output'
- → GAE Python app - Does URL matter for SEO?
- → Put a Rendered Django Template in Json along with some other items
- → session disappears when request is sent from fetch
- → Python Shopify API output formatted datetime string in django template
- → Shopify app: adding a new shipping address via webhook
- → Shopify + Python library: how to create new shipping address
- → shopify python api: how do add new assets to published theme?
- → Access 'HTTP_X_SHOPIFY_SHOP_API_CALL_LIMIT' with Python Shopify Module