async semaphore 装饰器 | python代码速记 | python 技术论坛-金年会app官方网
from functools import wraps
import asyncio
import inspect
def async_sem(conn):
def ret(fn):
sem = asyncio.semaphore(conn)
@wraps(fn)
async def _(*args, **kwargs):
async with sem:
await fn(*args, **kwargs)
return _
if inspect.iscoroutinefunction(conn):
fn = conn
conn = 20
return ret(fn)
return ret
@async_sem
async def test():
print("test")
await asyncio.sleep(1)
print("test don")
@async_sem(2)
async def test2():
print("test2")
await asyncio.sleep(0.5)
print("test2 don")
async def main():
await asyncio.gather(test(), test2())
asyncio.run(main())
謎麟