Skip to content

我是想通过这个LoopTimer轮询获取弹幕,data_all 中有几个url地址 我就同时采集弹幕数据发送到mq中,但是这个会在轮询3次后就报错,报错如下,然后怎么添加实现停止去获取弹幕信息啊 #105

@y4127241990

Description

@y4127241990

import asyncio
import selectors
from threading import Timer

from common.DanMu import get_dan_mu

def fun_timer():
"""
创建事件循环调用弹幕处理接口
:return:
"""
selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
get_dan_mu(loop)
loop.stop()
loop.run_forever()
loop.close()

class LoopTimer(Timer):

def __init__(self, interval, function, args=[], kwargs={}):
    Timer.__init__(self, interval, function, args, kwargs)

def run(self):
    while True:
        self.finished.wait(self.interval)
        if self.finished.is_set():
            self.finished.set()
            break
        self.function(*self.args, **self.kwargs)

if name == 'main':
t = LoopTimer(1, fun_timer)
t.start()

import asyncio
import json
import time

import danmaku

async def send_dan_mu(q, data):
try:
while True:
m = await q.get()
if m['msg_type'] == 'danmaku':
time_local = time.localtime(time.time())
# 转换成新的时间格式(2016-05-05 20:28:54)
dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
data['name'] = m["name"]
data['content'] = m["content"]
data['dt'] = dt
# mq配置信息
msg = json.dumps(data, ensure_ascii=False)
print('弹幕信息发送到MQ成功:{}'.format(msg))
except Exception as e:
print('弹幕信息发送到MQ异常信息:{}'.format(e))
pass

async def dan_mu_server(data):
q = asyncio.Queue()
dmc = danmaku.DanMaKuClient(data['url'], q)
asyncio.create_task(send_dan_mu(q, data))
await dmc.start()

def get_dan_mu(loop):
data_all = [(666, 111, 'https://www.huya.com/688')]
if data_all:
tasks = []
for each in data_all:
data = {}
data['a'] = each[0]
data['b'] = each[1]
data['url'] = each[2]
print('弹幕房间信息:{}'.format(data))
q = asyncio.Queue()
dmc = danmaku.DanmakuClient(data['url'], q)
task = loop.create_task(send_dan_mu(data))
await dmc.start()
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks, timeout=60))

if name == 'main':

get_dan_mu()

然后报错信息:
Task was destroyed but it is pending!
task: <Task pending coro=<dan_mu_server() running at D:\workspace\pandora-live-platform\common\DanMu.py:45> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95F4A7F8>()]>>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000023C93636988>
Task was destroyed but it is pending!
task: <Task pending coro=<send_dan_mu() running at D:\workspace\pandora-live-platform\common\DanMu.py:23> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000023C951A8A68>()]>>
[ERROR] [2020-10-31 12:37:20,837] "D:\workspace\pandora-live-platform\common\DanMu.py" [send_dan_mu:37] - "弹幕信息发送到MQ异常信息:Event loop is closed"
Task was destroyed but it is pending!
task: <Task pending coro=<dan_mu_server() running at D:\workspace\pandora-live-platform\common\DanMu.py:45> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95FB7438>()]>>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000023C95FA0908>
Task was destroyed but it is pending!
task: <Task pending coro=<send_dan_mu() running at D:\workspace\pandora-live-platform\common\DanMu.py:23> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000023C95FB7A98>()]>>
[ERROR] [2020-10-31 12:37:20,843] "D:\workspace\pandora-live-platform\common\DanMu.py" [send_dan_mu:37] - "弹幕信息发送到MQ异常信息:Event loop is closed"

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions