Skip to content

扩展功能举栗🌰 - 配有视频和代码 #24

@lemisky

Description

@lemisky

获取推荐的订阅用户

简单实现
"""获取订阅页面推荐用户列表"""
from aligo import Aligo


class CustomAligo(Aligo):
    """自定义Aligo类"""

    def user_recommend(self):
        """获取订阅页面推荐用户列表"""
        return self._post('/adrive/v1/timeline/user/recommend', body={
            'limit': 20,
            'order_by': "updated_at",
            'order_direction': "DESC",
            'user_id': self.user_id,  # 自己的user_id
        }).json()['items']


if __name__ == '__main__':
    cali = CustomAligo()
    ll = cali.user_recommend()
    print(ll)
2022-05-14.15-46-37.mp4
优化一下
"""获取订阅页面推荐用户列表"""
from dataclasses import dataclass
from typing import List

from aligo import Aligo


@dataclass
class FollowUser:
    """订阅用户"""
    description: str = None
    avatar: str = None
    user_id: str = None
    nick_name: str = None
    phone: str = None
    is_following: bool = None


class CustomAligo(Aligo):
    """自定义Aligo类"""

    def user_recommend(self) -> List[FollowUser]:
        """获取订阅页面推荐用户列表"""
        items = self._post('/adrive/v1/timeline/user/recommend', body={
            'limit': 20,
            'order_by': "updated_at",
            'order_direction': "DESC",
            'user_id': self.user_id,  # 自己的user_id
        }).json()['items']
        return [FollowUser(**item) for item in items]


if __name__ == '__main__':
    cali = CustomAligo()
    ll = cali.user_recommend()
    for f in ll:
        print(f.user_id, f.nick_name, f.is_following)
222.mp4
完善一下
"""获取订阅页面推荐用户列表"""
from dataclasses import dataclass
from typing import List

from aligo import Aligo


@dataclass
class FollowUser:
    """订阅用户"""
    description: str = None
    avatar: str = None
    user_id: str = None
    nick_name: str = None
    phone: str = None
    is_following: bool = None


class CustomAligo(Aligo):
    """自定义Aligo类"""

    def user_recommend(self, limit: int = 20) -> List[FollowUser]:
        """获取订阅页面推荐用户列表"""
        resp = self._post('/adrive/v1/timeline/user/recommend', body={
            'limit': limit if limit <= 100 else 100,
            'order_by': "updated_at",
            'order_direction': "DESC",
            'user_id': self.user_id,  # 自己的user_id
        })
        items = resp.json()['items']
        return [FollowUser(**item) for item in items]


if __name__ == '__main__':
    cali = CustomAligo()
    ll = cali.user_recommend()
    for f in ll:
        print(f.user_id, f.nick_name, f.is_following)
333.mp4

订阅用户

订阅用户
"""
1. 获取订阅页面推荐用户列表
2. 订阅用户
"""
from dataclasses import dataclass
from typing import List

from aligo import Aligo


@dataclass
class FollowUser:
    """订阅用户"""
    description: str = None
    avatar: str = None
    user_id: str = None
    nick_name: str = None
    phone: str = None
    is_following: bool = None


class CustomAligo(Aligo):
    """自定义Aligo类"""

    def user_recommend(self, limit: int = 20) -> List[FollowUser]:
        """获取订阅页面推荐用户列表"""
        resp = self._post('/adrive/v1/timeline/user/recommend', body={
            'limit': limit if limit <= 100 else 100,
            'order_by': "updated_at",
            'order_direction': "DESC",
            'user_id': self.user_id,  # 自己的user_id
        })
        items = resp.json()['items']
        return [FollowUser(**item) for item in items]

    def follow_user(self, user_id: str) -> bool:
        """订阅用户"""
        resp = self._post('/adrive/v1/member/follow_user', body={
            'user_id': user_id
        })
        return resp.status_code == 200


if __name__ == '__main__':
    cali = CustomAligo()
    # 订阅 3 个用户
    users = cali.user_recommend(3)
    for user in users:
        if not user.is_following:
            result = cali.follow_user(user.user_id)
            print(result)
    # 想起来了,aligo 用的是另一个账户
444_1.mp4
444_2.mp4

取消订阅

取消订阅
"""
1. 获取订阅页面推荐用户列表
2. 订阅用户
3. 取消订阅用户
"""
from dataclasses import dataclass
from typing import List

from aligo import Aligo


@dataclass
class FollowUser:
    """订阅用户"""
    description: str = None
    avatar: str = None
    user_id: str = None
    nick_name: str = None
    phone: str = None
    is_following: bool = None


class CustomAligo(Aligo):
    """自定义Aligo类"""

    def user_recommend(self, limit: int = 20) -> List[FollowUser]:
        """获取订阅页面推荐用户列表"""
        resp = self._post('/adrive/v1/timeline/user/recommend', body={
            'limit': limit if limit <= 100 else 100,
            'order_by': "updated_at",
            'order_direction': "DESC",
            'user_id': self.user_id,  # 自己的user_id
        })
        items = resp.json()['items']
        return [FollowUser(**item) for item in items]

    def follow_user(self, user_id: str) -> bool:
        """订阅用户"""
        resp = self._post('/adrive/v1/member/follow_user', body={
            'user_id': user_id
        })
        return resp.status_code == 200

    def unfollow_user(self, user_id: str) -> bool:
        """取消订阅用户"""
        resp = self._post('/adrive/v1/member/unfollow_user', body={
            'user_id': user_id
        })
        return resp.status_code == 200


if __name__ == '__main__':
    cali = CustomAligo()
    # 订阅 3 个用户
    users = cali.user_recommend(3)
    for user in users:
        if not user.is_following:
            result = cali.follow_user(user.user_id)
            print(result)
555.mp4

获取订阅用户列表

获取订阅用户列表
"""
1. 获取订阅页面推荐用户列表
2. 订阅用户
3. 取消订阅用户
4. 获取订阅用户列表
"""
from dataclasses import dataclass, field
from typing import List

from aligo import Aligo


@dataclass
class FollowUser:
    """订阅用户"""
    description: str = None
    avatar: str = None
    user_id: str = None
    nick_name: str = None
    phone: str = None
    is_following: bool = None
    has_unread_message: bool = None
    latest_messages: List[dict] = field(default_factory=list)


class CustomAligo(Aligo):
    """自定义Aligo类"""

    def user_recommend(self, limit: int = 20) -> List[FollowUser]:
        """获取订阅页面推荐用户列表"""
        resp = self._post('/adrive/v1/timeline/user/recommend', body={
            'limit': limit if limit <= 100 else 100,
            'order_by': "updated_at",
            'order_direction': "DESC",
            'user_id': self.user_id,  # 自己的user_id
        })
        items = resp.json()['items']
        return [FollowUser(**item) for item in items]

    def follow_user(self, user_id: str) -> bool:
        """订阅用户"""
        resp = self._post('/adrive/v1/member/follow_user', body={
            'user_id': user_id
        })
        return resp.status_code == 200

    def unfollow_user(self, user_id: str) -> bool:
        """取消订阅用户"""
        resp = self._post('/adrive/v1/member/unfollow_user', body={
            'user_id': user_id
        })
        return resp.status_code == 200

    def list_following(self, limit: int = 20) -> List[FollowUser]:
        """获取订阅用户列表"""
        resp = self._post('/adrive/v1/member/list_following', body={
            "limit": limit, "order_by": "updated_at", "order_direction": "DESC"
        })
        items = resp.json()['items']
        return [FollowUser(**item) for item in items]


if __name__ == '__main__':
    cali = CustomAligo()
    users = cali.list_following()
    for user in users:
        print(user)
666_1.mp4
666_2.mp4

Metadata

Metadata

Assignees

No one assigned

    Labels

    documentationImprovements or additions to documentation

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions