Ad

Run Crypto Feed In A Concurrent Thread: There Is No Current Event Loop In Thread 'ThreadPoolExecutor-0_0'

- 1 answer

I'm trying to use crypto feed to download data concurrently.

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()

This code above can be run successfully. However, I am trying to run it in the background. So I am using concurrent futures to help.

executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(f.run)

However, I got error:

job2.result()


---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-54-f96e35ee3c66> in <module>
----> 1 job2.result()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433 
    434             self._condition.wait(timeout)

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
    145             raise ValueError(txt)
    146 
--> 147         loop = asyncio.get_event_loop()
    148         # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
    149         # loop.set_debug(True)

~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
    637 
    638         if self._local._loop is None:
--> 639             raise RuntimeError('There is no current event loop in thread %r.'
    640                                % threading.current_thread().name)
    641 

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

Could anyone help me? Thanks so much!

Edit: following

def threadable():    
    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    f.run()


executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
job2.done()
job2.result()

I got the error: It seems I still got the same error about event loop... is it solvable?

RuntimeError                              Traceback (most recent call last)
<ipython-input-47-05c023dd326f> in <module>
     11 job2.done()
     12 
---> 13 job2.result()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

<ipython-input-47-05c023dd326f> in threadable()
      2     f = FeedHandler()
      3     f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
----> 4     f.run()
      5 
      6 

~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
    145             raise ValueError(txt)
    146 
--> 147         loop = asyncio.get_event_loop()
    148         # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
    149         # loop.set_debug(True)

~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
    637 
    638         if self._local._loop is None:
--> 639             raise RuntimeError('There is no current event loop in thread %r.'
    640                                % threading.current_thread().name)
    641 

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'.
Ad

Answer

In the single-threaded version of your code, all three of these statements execute in the same thread in a simple sequential fashion:

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()

In the multithreaded version, you submit only the last line to the Executor, and therefore it will run in a secondary thread. But these statements, as far as I can tell from the code you provided, still execute in the main thread:

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))

How do you know that will work? In general it would depend on the implementation details of Gateio and Feedhandler. You need to be very careful about chopping up a program into pieces to be run in different threads, especially when third-party library calls are involved. So, good luck with that.

You could try this:

def threadable():
    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    f.run()

...

executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)

Then, at least, your entire sequence of steps will execute in the SAME thread.

I would be worried about those callbacks, however. They will now run in the secondary thread, and you need to understand the consequences of that. Do they interact with a user interface program? Your UI may not support multithreading.

The use of the Executor protocol is a bit weird here, since your function doesn't return a value. The Executors are most useful when they are used to aggregate returned values. You may be better off just launching the threads you need using methods in the threading module.

Ad
source: stackoverflow.com
Ad