import asyncio
import selectors
from threading import Timer
from common.DanMu import get_dan_mu
def fun_timer():
"""
创建事件循环调用弹幕处理接口
:return:
"""
selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
get_dan_mu(loop)
loop.stop()
loop.run_forever()
loop.close()
class LoopTimer(Timer):
def __init__(self, interval, function, args=[], kwargs={}):
Timer.__init__(self, interval, function, args, kwargs)
def run(self):
while True:
self.finished.wait(self.interval)
if self.finished.is_set():
self.finished.set()
break
self.function(*self.args, **self.kwargs)
if name == 'main':
t = LoopTimer(1, fun_timer)
t.start()
import asyncio
import json
import time
import danmaku
async def send_dan_mu(q, data):
try:
while True:
m = await q.get()
if m['msg_type'] == 'danmaku':
time_local = time.localtime(time.time())
# 转换成新的时间格式(2016-05-05 20:28:54)
dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
data['name'] = m["name"]
data['content'] = m["content"]
data['dt'] = dt
# mq配置信息
msg = json.dumps(data, ensure_ascii=False)
print('弹幕信息发送到MQ成功:{}'.format(msg))
except Exception as e:
print('弹幕信息发送到MQ异常信息:{}'.format(e))
pass
async def dan_mu_server(data):
q = asyncio.Queue()
dmc = danmaku.DanMaKuClient(data['url'], q)
asyncio.create_task(send_dan_mu(q, data))
await dmc.start()
def get_dan_mu(loop):
data_all = [(666, 111, 'https://www.huya.com/688')]
if data_all:
tasks = []
for each in data_all:
data = {}
data['a'] = each[0]
data['b'] = each[1]
data['url'] = each[2]
print('弹幕房间信息:{}'.format(data))
q = asyncio.Queue()
dmc = danmaku.DanmakuClient(data['url'], q)
task = loop.create_task(send_dan_mu(data))
await dmc.start()
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks, timeout=60))
if name == 'main':
get_dan_mu()
然后报错信息:
Task was destroyed but it is pending!
task: <Task pending coro=<dan_mu_server() running at D:\workspace\pandora-live-platform\common\DanMu.py:45> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95F4A7F8>()]>>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000023C93636988>
Task was destroyed but it is pending!
task: <Task pending coro=<send_dan_mu() running at D:\workspace\pandora-live-platform\common\DanMu.py:23> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000023C951A8A68>()]>>
[ERROR] [2020-10-31 12:37:20,837] "D:\workspace\pandora-live-platform\common\DanMu.py" [send_dan_mu:37] - "弹幕信息发送到MQ异常信息:Event loop is closed"
Task was destroyed but it is pending!
task: <Task pending coro=<dan_mu_server() running at D:\workspace\pandora-live-platform\common\DanMu.py:45> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95FB7438>()]>>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000023C95FA0908>
Task was destroyed but it is pending!
task: <Task pending coro=<send_dan_mu() running at D:\workspace\pandora-live-platform\common\DanMu.py:23> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95FB7A98>()]>>
[ERROR] [2020-10-31 12:37:20,843] "D:\workspace\pandora-live-platform\common\DanMu.py" [send_dan_mu:37] - "弹幕信息发送到MQ异常信息:Event loop is closed"
import asyncio
import selectors
from threading import Timer
from common.DanMu import get_dan_mu
def fun_timer():
"""
创建事件循环调用弹幕处理接口
:return:
"""
selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
get_dan_mu(loop)
loop.stop()
loop.run_forever()
loop.close()
class LoopTimer(Timer):
if name == 'main':
t = LoopTimer(1, fun_timer)
t.start()
import asyncio
import json
import time
import danmaku
async def send_dan_mu(q, data):
try:
while True:
m = await q.get()
if m['msg_type'] == 'danmaku':
time_local = time.localtime(time.time())
# 转换成新的时间格式(2016-05-05 20:28:54)
dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
data['name'] = m["name"]
data['content'] = m["content"]
data['dt'] = dt
# mq配置信息
msg = json.dumps(data, ensure_ascii=False)
print('弹幕信息发送到MQ成功:{}'.format(msg))
except Exception as e:
print('弹幕信息发送到MQ异常信息:{}'.format(e))
pass
async def dan_mu_server(data):
q = asyncio.Queue()
dmc = danmaku.DanMaKuClient(data['url'], q)
asyncio.create_task(send_dan_mu(q, data))
await dmc.start()
def get_dan_mu(loop):
data_all = [(666, 111, 'https://www.huya.com/688')]
if data_all:
tasks = []
for each in data_all:
data = {}
data['a'] = each[0]
data['b'] = each[1]
data['url'] = each[2]
print('弹幕房间信息:{}'.format(data))
q = asyncio.Queue()
dmc = danmaku.DanmakuClient(data['url'], q)
task = loop.create_task(send_dan_mu(data))
await dmc.start()
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks,
timeout=60))if name == 'main':
get_dan_mu()
然后报错信息:
Task was destroyed but it is pending!
task: <Task pending coro=<dan_mu_server() running at D:\workspace\pandora-live-platform\common\DanMu.py:45> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95F4A7F8>()]>>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000023C93636988>
Task was destroyed but it is pending!
task: <Task pending coro=<send_dan_mu() running at D:\workspace\pandora-live-platform\common\DanMu.py:23> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000023C951A8A68>()]>>
[ERROR] [2020-10-31 12:37:20,837] "D:\workspace\pandora-live-platform\common\DanMu.py" [send_dan_mu:37] - "弹幕信息发送到MQ异常信息:Event loop is closed"
Task was destroyed but it is pending!
task: <Task pending coro=<dan_mu_server() running at D:\workspace\pandora-live-platform\common\DanMu.py:45> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95FB7438>()]>>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000023C95FA0908>
Task was destroyed but it is pending!
task: <Task pending coro=<send_dan_mu() running at D:\workspace\pandora-live-platform\common\DanMu.py:23> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95FB7A98>()]>>
[ERROR] [2020-10-31 12:37:20,843] "D:\workspace\pandora-live-platform\common\DanMu.py" [send_dan_mu:37] - "弹幕信息发送到MQ异常信息:Event loop is closed"