Run Crypto Feed In A Concurrent Thread: There Is No Current Event Loop In Thread 'ThreadPoolExecutor-0_0'
I'm trying to use crypto feed to download data concurrently.
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
This code above can be run successfully. However, I am trying to run it in the background. So I am using concurrent futures to help.
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(f.run)
However, I got error:
job2.result()
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-54-f96e35ee3c66> in <module>
----> 1 job2.result()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
430 raise CancelledError()
431 elif self._state == FINISHED:
--> 432 return self.__get_result()
433
434 self._condition.wait(timeout)
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
145 raise ValueError(txt)
146
--> 147 loop = asyncio.get_event_loop()
148 # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
149 # loop.set_debug(True)
~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
637
638 if self._local._loop is None:
--> 639 raise RuntimeError('There is no current event loop in thread %r.'
640 % threading.current_thread().name)
641
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
Could anyone help me? Thanks so much!
Edit: following
def threadable():
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
job2.done()
job2.result()
I got the error: It seems I still got the same error about event loop... is it solvable?
RuntimeError Traceback (most recent call last)
<ipython-input-47-05c023dd326f> in <module>
11 job2.done()
12
---> 13 job2.result()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
437 raise CancelledError()
438 elif self._state == FINISHED:
--> 439 return self.__get_result()
440 else:
441 raise TimeoutError()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
<ipython-input-47-05c023dd326f> in threadable()
2 f = FeedHandler()
3 f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
----> 4 f.run()
5
6
~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
145 raise ValueError(txt)
146
--> 147 loop = asyncio.get_event_loop()
148 # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
149 # loop.set_debug(True)
~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
637
638 if self._local._loop is None:
--> 639 raise RuntimeError('There is no current event loop in thread %r.'
640 % threading.current_thread().name)
641
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'.
Answer
In the single-threaded version of your code, all three of these statements execute in the same thread in a simple sequential fashion:
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
In the multithreaded version, you submit only the last line to the Executor, and therefore it will run in a secondary thread. But these statements, as far as I can tell from the code you provided, still execute in the main thread:
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
How do you know that will work? In general it would depend on the implementation details of Gateio and Feedhandler. You need to be very careful about chopping up a program into pieces to be run in different threads, especially when third-party library calls are involved. So, good luck with that.
You could try this:
def threadable():
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
...
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
Then, at least, your entire sequence of steps will execute in the SAME thread.
I would be worried about those callbacks, however. They will now run in the secondary thread, and you need to understand the consequences of that. Do they interact with a user interface program? Your UI may not support multithreading.
The use of the Executor protocol is a bit weird here, since your function doesn't return a value. The Executors are most useful when they are used to aggregate returned values. You may be better off just launching the threads you need using methods in the threading
module.
Related Questions
- → What are the pluses/minuses of different ways to configure GPIOs on the Beaglebone Black?
- → Django, code inside <script> tag doesn't work in a template
- → React - Django webpack config with dynamic 'output'
- → GAE Python app - Does URL matter for SEO?
- → Put a Rendered Django Template in Json along with some other items
- → session disappears when request is sent from fetch
- → Python Shopify API output formatted datetime string in django template
- → Can't turn off Javascript using Selenium
- → WebDriver click() vs JavaScript click()
- → Shopify app: adding a new shipping address via webhook
- → Shopify + Python library: how to create new shipping address
- → shopify python api: how do add new assets to published theme?
- → Access 'HTTP_X_SHOPIFY_SHOP_API_CALL_LIMIT' with Python Shopify Module