1 Call async functions from non-async functions

import asyncio
import threading
from types import coroutine

_loop = None


#  start a single event loop in a single background thread and use it for all fire&forget tasks
#  call fire_and_forget on a coroutine object, created by calling an async def
def fire_and_forget(coro: coroutine):
    global _loop
    if _loop is None:
        _loop = asyncio.new_event_loop()
        threading.Thread(target=_loop.run_forever, daemon=True).start()
    _loop.call_soon_threadsafe(asyncio.create_task, coro)


async def long_task_test(task_name: str):
    print("long_task start:" + task_name)
    await asyncio.sleep(3)
    print("long_task end:" + task_name)


fire_and_forget(long_task_test("111"))
fire_and_forget(long_task_test("222"))

output:

long_task start:111
long_task start:222
long_task end:111
long_task end:222
Home Page