Sanic 是一个 Python 异步 IO Web 框架和 Server,使用了 asyncio 来简化异步 IO 编程。由于使用了异步 IO,用 Sanic 编写的 Web 服务可以支持非常高的并发,通常来说单核机器可以同时承受上万的连接。除开使用异步 IO,Sanic 在使用感受上非常类似于 Flask 框架,熟悉 Flask 的可以很快上手。使用异步 IO 的好处是为了在某个任务执行 IO(读写磁盘和网络数据)操作的时候能够把 CPU 释放出来给其它任务,而不是让 CPU 空闲等待,这样可以大大提升 CPU 的使用率。对于需要读写数据库和缓存等后端服务的应用,仅仅在读写请求数据的时候为异步还不够,读写后端服务数据同样需要为异步,否则 CPU 在等待后端服务处理时仍然会空闲等待。可以使用 aiomysqlaioredis 这样的异步 IO 库来分别访问 MySQL 和 Redis。另外我们还会使用 Fire 框架来编写命令行应用,以及 APScheduler 任务管理器来定时运行异步任务。

为什么选择 Sanic

AsyncIO 介绍

Python 3.4 开始引入 asyncio 模块,使得 Python 也支持异步 IO。3.5 版本里添加了 async/await 关键字,使得异步 IO 代码编写更加方便。3.6 和 3.7 版本继续进行了完善,截止到目前最新的 3.7 版本,asyncio 模块已经非常成熟和稳定。

AsyncIO 引入了协程(coroutine)的概念。协程可以看做是用户态的线程,它由应用本身来调度运行,而不是系统内核。协程的开销远低于线程,普通的机器上都可以运行成千上万的协程。

下面的代码把 main 函数定义为了一个协程:

import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

相比普通的函数定义,前面多了一个 async 关键字。有了这个关键字,就可以在函数内部使用 await 关键字,await 用来等待其它的协程运行完成。与普通的 main 函数不同,如果我们执行它,将得到一个协程对象,函数体并没有被运行。注意协程这个词有两种含义,一种是协程定义,另一种是创建完成的协程对象。

那么如何执行协程代码了?可以使用 asyncio.run 函数,它会创建一个 loop,然后把协程注册到这个 loop 里去运行,待协程运行结束后再关闭 loop。loop 的作用是调度协程运行,一个 loop 里可以注册很多协程。当某个协程需要等待外部 IO 完成时会主动挂起(通过 await 表明)自己,让出 CPU 给其它等待运行的协程。待外部 IO 完成时,loop 会收到事件通知,原先被挂起的协程将变成待运行状态,等待其它协程让出 CPU 后运行。如果有多个协程等待运行,那么 loop 会采用某种调度算法从中选择一个来运行。

asyncio.run(main())

更多有关协程的介绍和使用,可以参阅 Python 官方文档 asyncio

Sanic 介绍

随着 Python asyncio 模块的推出,出现了一些支持异步 IO 的 Web 框架,其中最受欢迎的是 Sanic,其 GitHub Star 数目前已经过万。Sanic 最新版本要求使用 Python 3.6+ 版本,其底层使用了性能更好的 uvloop 来替换 asyncio 默认的事件循环。uvloop 是 Cython 写的,构建于 libuv 之上,相比其它事件循环,速度差不多要快一倍,几乎接近于 Go 的速度。同时在使用体验上又很类似于 Flask 框架,比如请求对象和响应对象的属性和方法、请求处理器、路由、蓝图、异常处理等,熟悉 Flask 框架的同学可以无缝切入。

开发实战

本次开发我们以围观 APP 的用户系统为例,来讲解如何使用 Sanic、AIOMySQL、AIORedis 等框架和库来实现注册、登录、退出、用户统计等功能。完整代码可从 GitHub 获取 围观 Server

确定目录结构

为了便于理解代码,首先来看一下完整的项目目录结构。

.
├── docker-compose.yml # Docker Compose 配置文件
├── Dockerfile # Docker 镜像构建文件
├── README.md
├── requirements.txt # PIP 依赖包
├── venv # Python 虚拟环境
└── weiguan # Python 顶层包
    ├── __init__.py
    ├── app.py # 应用服务入口
    ├── blueprints # 蓝图,放置对外接口
    │   ├── __init__.py
    │   ├── account.py # 帐号
    │   ├── common.py # 公共函数
    │   ├── message.py # 消息
    │   └── user.py # 用户
    ├── commands # 管理命令
    │   ├── __init__.py
    │   ├── model.py # 模型管理
    │   └── schedule.py # 定时任务管理
    ├── config # 配置
    │   ├── __init__.py
    │   ├── base.py # 基础配置
    │   └── log.py # 日志配置
    ├── manage.py # 管理命令入口
    ├── models # 模型,主要是表定义
    │   ├── __init__.py
    │   ├── common.py # 公共函数
    │   ├── file.py # 文件
    │   ├── message.py # 消息
    │   ├── post_image.py # 动态图片
    │   ├── post_like.py # 动态点赞
    │   ├── post_stat.py # 动态统计
    │   ├── post.py # 动态
    │   ├── security.py # 安全
    │   ├── user_follow.py # 用户关注
    │   ├── user_stat.py # 用户统计
    │   └── user.py # 用户
    ├── services # 业务逻辑层
    │   ├── __init__.py
    │   ├── common.py # 公共函数
    │   ├── stat.py # 统计
    │   └── user.py # 用户
    └── utils # 工具函数
        ├── __init__.py
        ├── datetime.py # 日期时间
        └── string.py # 字符串

为了防止命名冲突,本项目的所有 Python 代码都放在了 weiguan 这个 package 下。为了方便部署,项目依赖的所有 PIP 包都通过 pip freeze >requirements.txt 导出到了 requirements.txt 文件中,里面包含了各个依赖包名及其版本号。

创建虚拟环境

为了避免同一机器不同项目之间出现 PIP 包版本冲突,最好为每个项目创建一个独立的 Python 环境。Python 3.3+ 自带的 venv 模块就是用来管理虚拟环境的,项目的虚拟环境可以创建在项目根目录下。由于虚拟环境是跟本地 Python 绑定的,所以不能提交到 Git 仓库,需要把它加入到 .gitignore 文件里。

进入项目根目录,执行下面的命令来创建一个名为 venv 的虚拟环境。

python3 -m venv venv

定义表模型及其序列化器

使用数据库的第一步是定义表模型,表模型统一放在 models 子目录下。我们使用 AIOMySQL 来访问 MySQL 数据库,它基于 PyMySQL,提供了类似的 API,同时它还支持 SQLAlchemy。使用 SQLAlchemy 可以避免裸写 SQL,方便的同时还能避免 SQL 注入攻击,因此我们将使用 SQLAlchemy 方式来访问数据库。

从数据库查询出来的对象,如果要通过 API 响应给客户端,需要进行序列化。序列化通常都采用 JSON 格式,不过并不是所有的 Python 类型都支持 JSON 序列化,比如 datetime 类型,即便支持可能默认的格式也不是我们想要的。有时我们还希望对序列化后的 JSON 对象属性进行裁剪和转换,比如删除某些需要保密的字段。使用 Marshmallow 这个库,可以允许我们自由定义序列化后的 JSON 对象格式,同时还能避免对外输出对象的结构跟内部对象的结构紧耦合。

下面是用户表和用户统计表的表模型定义及其序列化器。

weiguan/models/user.py

import sqlalchemy as sa
import sqlalchemy.sql as sasql
from marshmallow import Schema, fields

from .common import metadata, LocalDateTime

UserModel = sa.Table(
    'user', metadata,
    sa.Column("id", sa.Integer, nullable=False, primary_key=True,
              comment='ID'),
    sa.Column('username', sa.VARCHAR(20), nullable=False, comment='用户名'),
    sa.Column('salt', sa.CHAR(64), nullable=False, comment='密钥'),
    sa.Column('password', sa.CHAR(64), nullable=False, comment='已加密的密码'),
    sa.Column('mobile', sa.CHAR(11), nullable=True, comment='手机号'),
    sa.Column('intro', sa.VARCHAR(100), nullable=False, server_default='',
              comment='自我介绍'),
    sa.Column('avatar_id', sa.Integer, nullable=True, comment='头像文件 ID'),
    sa.Column("created_at", LocalDateTime, nullable=False,
              server_default=sasql.text('CURRENT_TIMESTAMP'),
              comment='创建时间'),
    sa.Column("updated_at", LocalDateTime, nullable=False,
              server_default=sasql.text(
                  'CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'),
              comment='更新时间'),
    sa.Index('idx_username', 'username', unique=True),
    sa.Index('idx_mobile', 'mobile', unique=True),
    sa.Index('idx_avatar_id', 'avatar_id'),
    sa.ForeignKeyConstraint(['avatar_id'], ['file.id'], ondelete='CASCADE',
                            onupdate='CASCADE', use_alter=True),
    comment='用户',
)


class UserSchema(Schema):
    """用户序列化器
    """
    id = fields.Integer()
    username = fields.Str()
    mobile = fields.Str()
    intro = fields.Str()
    avatarId = fields.Integer(attribute='avatar_id')
    createdAt = fields.DateTime(attribute='created_at')
    updatedAt = fields.DateTime(attribute='updated_at')

    avatar = fields.Nested('FileSchema')

weiguan/models/user_stat.py

from enum import Enum

import sqlalchemy as sa
import sqlalchemy.sql as sasql
from marshmallow import Schema, fields

from .common import metadata, LocalDateTime
from .user import UserSchema


UserStatModel = sa.Table(
    'user_stat', metadata,
    sa.Column("id", sa.Integer, nullable=False, primary_key=True,
              comment='ID'),
    sa.Column('user_id', sa.Integer, nullable=False, comment='用户 ID'),
    sa.Column('post_count', sa.Integer, nullable=False, server_default='0',
              comment='发布动态数'),
    sa.Column('like_count', sa.Integer, nullable=False, server_default='0',
              comment='喜欢动态数'),
    sa.Column('following_count', sa.Integer, nullable=False, server_default='0',
              comment='关注人数'),
    sa.Column('follower_count', sa.Integer, nullable=False, server_default='0',
              comment='粉丝人数'),
    sa.Column("created_at", LocalDateTime, nullable=False,
              server_default=sasql.text('CURRENT_TIMESTAMP'),
              comment='创建时间'),
    sa.Column("updated_at", LocalDateTime, nullable=False,
              server_default=sasql.text(
                  'CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'),
              comment='更新时间'),
    sa.Index('idx_user_id', 'user_id', unique=True),
    sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='CASCADE',
                            onupdate='CASCADE', use_alter=True),
    comment='用户统计',
)


class UserStatSchema(Schema):
    """用户统计序列化器
    """
    id = fields.Integer()
    userId = fields.Integer(attribute='user_id')
    postCount = fields.Integer(attribute='post_count')
    likeCount = fields.Integer(attribute='like_count')
    followingCount = fields.Integer(attribute='following_count')
    followerCount = fields.Integer(attribute='follower_count')
    createdAt = fields.DateTime(attribute='created_at')
    updatedAt = fields.DateTime(attribute='updated_at')

    user = fields.Nested(UserSchema)

编写模型管理命令

有了表模型之后,就可以利用它来在数据库里创建表,而不用再去编写建表的 SQL,这也是使用 SQLAlchemy 的好处之一。可以打开 Python 的交互式命令行来执行建表语句,但在建表之前需要导入一些依赖包,并执行一些初始化工作,比如加载应用配置、创建数据库连接等。这种方式适合只执行一次的临时操作,如果一个操作要重复执行多次,最好将它实现为一条命令。

得益于 Python 强大的动态性,有许多功能强大的 Python 命令行应用框架,比如 Google 出品的 Fire。使用它,只需一行代码就可以把一个函数、一个类的所有方法或者一个模块的所有函数暴露为可在命令行执行的命令。我们将使用它来编写模型管理命令,其中包含建表的子命令,后面还会编写定时任务管理命令。

Fire 支持多层级的命令结构,为了方便记忆和管理,我们把项目的所有命令都放在 manage 这个根命令下。manage 负责初始化命令执行环境,比如加载应用配置、初始化数据库连接池等,并添加其它管理命令作为其子命令。

weiguan/manage.py

import logging
import asyncio

import fire

from .config import config, get_log_config
from .models import init_db, init_cache, init_ws
from .commands import Model, Schedule


class Manage(object):
    """命令行管理工具,比如建表和运行定时任务
    """

    def __init__(self, config):
        self.config = config

    async def init(self):
        self.db = await init_db(self.config)
        self.cache = await init_cache(self.config)
        self.ws = await init_ws(self.config, self.db, self.cache)

        self.model = Model(self.config, self.db, self.cache)
        self.schedule = Schedule(self.config, self.db, self.cache, self.ws)


if __name__ == '__main__':
    logging.config.dictConfig(get_log_config(config))

    manage = Manage(config)
    asyncio.get_event_loop().run_until_complete(manage.init())

    fire.Fire(manage)

由于 Fire 不支持协程,而我们的数据库、缓存、WebSocket 的初始化函数都是协程,所以需要我们自己先完成这些初始化协程的运行,再执行 Fire 的初始化。首先调用 asyncio.get_event_loop 获取到当前线程的 loop(如果没有会自动创建一个),然后调用其 run_until_complete 方法来运行协程并等待其运行结束。初始化工作完成之后,就可以调用 fire.Fire 方法来将 Manage 对象的各个属性暴露为子命令。

创建完成根命令之后,就可以在 commands 目录下创建各种管理子命令。先来创建模型管理子命令,定时任务子命令稍后会创建。

weiguan/commands/model.py

import sqlalchemy as sa

from ..models import metadata


class Model(object):
    """库表结构管理
    """

    def __init__(self, config, db, cache):
        self.config = config
        self.db = db
        self.cache = cache

        self.engine = sa.create_engine(
            'mysql://{}:{}@{}:{}/{}?charset=utf8mb4'.format(
                config['MYSQL_USER'], config['MYSQL_PASSWORD'],
                config['MYSQL_HOST'], config['MYSQL_PORT'],
                config['MYSQL_DB']))

    def create_tables(self, tables=None):
        """创建表
        """
        if tables is None:
            tables = metadata.tables.keys()
        elif isinstance(tables, str):
            tables = [tables]

        for table in tables:
            metadata.tables[table].create(self.engine, True)

model 子命令又包含一个子命令 create_tables,不带任何参数执行它会创建所有表,也可以通过 tables 参数指定要创建哪张表。

如果要创建所有表,可以执行如下命令。

python -u -m weiguan.manage model create_tables

如果只想创建某张表,比如 user,可以指定 --tables 选项。

python -u -m weiguan.manage model create_tables --tables user

还可以参数的方式来传递,省去指定选项,参数顺序跟函数定义保持一致。

python -u -m weiguan.manage model create_tables user

实现业务逻辑

前面的模型层只负责内部数据跟外部系统的交换,比如写入数据库或从数据库读取,不包含业务逻辑。业务逻辑是应用的核心,为了降低跟外部系统的耦合,一般把它独立到一层中。业务逻辑按功能模块进行划分,都放在 services 目录下。

以用户模块为例,包含了注册、登录、修改、查询单个、查询列表等接口。

weiguan/services/user.py

import os
import hashlib
import string

import sqlalchemy.sql as sasql

from ..models import UserModel
from ..models import SecurityModel
from ..utils.string import random_string


class UserService(object):
    """用户相关业务
    """

    def __init__(self, config, db, cache):
        self.config = config
        self.db = db
        self.cache = cache

    async def create(self, **data):
        """创建用户
        """
        async with self.db.acquire() as conn:
            data['salt'] = hashlib.sha256(os.urandom(60)).hexdigest()
            data['password'] = hashlib.sha256(
                (data['password'] + data['salt']).encode()).hexdigest()
            result = await conn.execute(sasql.insert(UserModel).values(**data))
            id = result.lastrowid

        return await self.info(id)

    async def edit(self, id, **data):
        """修改用户
        """
        data = {k: v for k, v in data.items() if v is not None}

        user = await self.info(id)
        if 'password' in data:
            data['password'] = hashlib.sha256(
                (data['password'] + user['salt']).encode()).hexdigest()

        async with self.db.acquire() as conn:
            await conn.execute(
                sasql.update(UserModel).where(UserModel.c.id == id)
                .values(**data))

        return await self.info(id)

    async def login_by_username(self, username, password):
        """使用用户名登录
        """
        async with self.db.acquire() as conn:
            result = await conn.execute(
                UserModel.select().where(UserModel.c.username == username))
            row = await result.first()

            if (row is not None and
                    row['password'] == hashlib.sha256(
                        (password + row['salt']).encode()).hexdigest()):
                return dict(row)
            else:
                return None

    async def login_by_mobile(self, mobile, password):
        """使用手机号登录
        """
        async with self.db.acquire() as conn:
            result = await conn.execute(
                UserModel.select().where(UserModel.c.mobile == mobile))
            row = await result.first()

            if (row is not None and
                    row['password'] == hashlib.sha256(
                        (password + row['salt']).encode()).hexdigest()):
                return dict(row)
            else:
                return None

    async def info(self, id):
        """查询单个用户
        """
        async with self.db.acquire() as conn:
            result = await conn.execute(
                UserModel.select().where(UserModel.c.id == id))
            row = await result.first()

            return None if row is None else dict(row)

    async def infos(self, ids):
        """查询多个用户
        """
        async with self.db.acquire() as conn:
            result = await conn.execute(
                UserModel.select().where(UserModel.c.id.in_(ids)))
            d = {v['id']: dict(v) for v in await result.fetchall()}

            return [d[v] for v in ids]

    async def list_(self, *, limit=None, offset=None):
        """查询用户列表
        """
        async with self.db.acquire() as conn:
            select_sm = UserModel.select()
            count_sm = sasql.select(
                [sasql.func.count()]).select_from(UserModel)

            select_sm = select_sm.order_by(UserModel.c.created_at.desc())

            if limit is not None:
                select_sm = select_sm.limit(limit)
            if offset is not None:
                select_sm = select_sm.offset(offset)

            result = await conn.execute(select_sm)
            rows = [dict(v) for v in await result.fetchall()]

            result = await conn.execute(count_sm)
            total = await result.scalar()

            return (rows, total)

    async def send_mobile_register_verify_code(self, mobile):
        """发送手机号注册验证码
        """
        security_model = SecurityModel(self.config, self.db, self.cache)
        code = await security_model.get_verify_code('mobile_register', mobile)
        if code is None:
            code = random_string(6, string.digits)
            await security_model.set_verify_code(
                'mobile_register', mobile, code)

            # TODO 调用第三方 API 发送验证码手机短信

        return code

    async def verify_mobile_register_code(self, mobile, code):
        """验证手机号注册验证码
        """
        security_model = SecurityModel(self.config, self.db, self.cache)
        sended = await security_model.get_verify_code('mobile_register', mobile)
        if sended is None or sended != code:
            return False

        await security_model.delete_verify_code('mobile_register', mobile)
        return True

处理请求

考虑到以后接口可能会很多,为了代码的可维护性,有必要使用蓝图(blueprint)来按功能模块对接口进行分组。蓝图都放在 blueprints 目录下,一个蓝图下的接口拥有相同的路径前缀。

以帐号蓝图为例,其下包含了登录、注册、退出等跟帐号管理相关的接口。

weiguan/blueprints/account.py

from sanic import Blueprint
from sanic.exceptions import NotFound

from ..services import UserService
from ..models import UserSchema
from .common import ResponseCode, response_json, authenticated

account = Blueprint('account', url_prefix='/account')


@account.post('/register')
async def register(request):
    """注册
    """
    data = request.json
    username = data['username']
    password = data['password']

    user_service = UserService(
        request.app.config, request.app.db, request.app.cache)
    user = await user_service.create(
        username=username, password=password)

    return response_json(user=UserSchema().dump(user))


@account.post('/login')
async def login(request):
    """登录
    """
    data = request.json
    username = data.get('username')
    mobile = data.get('mobile')
    password = data['password']

    user_service = UserService(
        request.app.config, request.app.db, request.app.cache)
    if username is not None:
        user = await user_service.login_by_username(
            username, password)
    elif mobile is not None:
        user = await user_service.login_by_mobile(
            mobile, password)
    else:
        user = None

    if user is not None:
        request['session']['user'] = UserSchema().dump(user)

        return response_json(user=UserSchema().dump(user))
    else:
        return response_json(ResponseCode.FAIL, '帐号或密码错误')


@account.get('/logout')
async def logout(request):
    """退出
    """
    request['session'].pop('user', None)

    return response_json()


@account.get('/info')
async def info(request):
    """获取当前登录用户信息
    """
    user = request['session'].get('user')

    return response_json(user=user)


@account.post('/edit')
@authenticated()
async def edit(request):
    """编辑资料
    """
    data = request.json
    id = request['session']['user']['id']
    username = data.get('username')
    password = data.get('password')
    mobile = data.get('mobile')
    avatar_id = data.get('avatarId')
    code = data.get('code')

    user_service = UserService(
        request.app.config, request.app.db, request.app.cache)

    if (mobile is not None and
            not (await user_service.verify_mobile_register_code(mobile, code))):
        return response_json(ResponseCode.FAIL, '验证码错误')

    user = await user_service.edit(
        id, username=username, password=password, mobile=mobile,
        avatar_id=avatar_id)

    request['session']['user'] = UserSchema().dump(user)

    return response_json(user=UserSchema().dump(user))


@account.post('/send/mobile/verify/code')
@authenticated()
async def send_mobile_verify_code(request):
    """发送手机注册验证码
    """
    data = request.json
    mobile = data.get('mobile')

    user_service = UserService(
        request.app.config, request.app.db, request.app.cache)
    verify_code = await user_service.send_mobile_register_verify_code(mobile)

    return response_json(verify_code=verify_code)

应用运行入口

所有组件都准备妥当之后,我们需要有一个入口程序来将这些组件集成起来,并启动我们的应用。

weiguan/app.py

from sanic import Sanic
from sanic_session import Session, AIORedisSessionInterface

from .config import config, log_config
from .models import init_db, close_db, init_cache, close_cache, init_ws
from .blueprints import handle_exception, account, message, user

app = Sanic(config['NAME'].capitalize(), log_config=log_config)
app.config.update(config)

app.error_handler.add(Exception, handle_exception)

app.blueprint(account)
app.blueprint(message)
app.blueprint(user)

session = Session()


@app.listener('before_server_start')
async def server_init(app, loop):
    """服务初始化
    """
    app.db = await init_db(config)

    app.cache = await init_cache(config)

    session.init_app(app, AIORedisSessionInterface(
        app.cache, expiry=config['SESSION_EXPIRY']))

    app.ws = await init_ws(config, app.db, app.cache)


@app.listener('after_server_stop')
async def server_clean(app, loop):
    """服务资源清理
    """
    await close_db(app.db)
    await close_cache(app.cache)


if __name__ == '__main__':
    app.run(host=config['HOST'], port=config['PORT'], debug=config['DEBUG'],
            auto_reload=config['AUTO_RELOAD'], access_log=config['ACCESS_LOG'],
            workers=config['WORKERS'])

这个入口程序完成了以下这些工作:

  1. 加载应用配置
  2. 创建 Sanic 应用
  3. 设置全局异常处理
  4. 注册各个蓝图
  5. 创建 Session 管理对象
  6. 启动应用时初始化数据库、缓存、会话等资源
  7. 停止应用时清理资源
  8. 最后启动应用,等待请求到来

执行定时任务

有的时候需要在后台定时执行一些任务,比如发送订阅邮件、计算统计数据等。传统的方式是使用 Crontab,但这个需要修改操作系统配置,不方便管理。特别是现在的应用大多使用 Docker,甚至是 Serverless 方式来部署,服务器对它们来说是透明的。Python 已经有不少成熟的定时任务管理系统,但大多不支持异步任务,因此我们选择了支持异步任务运行的 APScheduler。

同样我们需要有一个管理命令来启动定时任务。

weiguan/commands/schedule.py

import logging
import asyncio
from datetime import datetime, time, timedelta
import traceback

from marshmallow.utils import pprint
from apscheduler.schedulers.asyncio import AsyncIOScheduler

from ..utils import local_now
from ..models import UserStatSchema
from ..services import StatService

logger = logging.getLogger('app')


class Schedule(object):
    """定时任务管理
    """

    def __init__(self, config, db, cache, ws):
        self.config = config
        self.db = db
        self.cache = cache
        self.ws = ws

    async def stat_user(self, user_id=None):
        stat_service = StatService(self.config, self.db, self.cache)
        if user_id is None:
            await stat_service.stat_all_users()
        else:
            user_stat = await stat_service.stat_user(user_id)
            pprint(UserStatSchema().dump(user_stat))

    def run(self, task=None, *args, **kwargs):
        if task is None:
            scheduler = AsyncIOScheduler()
            scheduler.add_job(self.stat_user, 'interval', hours=1)
            scheduler.start()

            try:
                asyncio.get_event_loop().run_forever()
            except (KeyboardInterrupt, SystemExit):
                pass
        else:
            method = getattr(self, task)
            asyncio.get_event_loop().run_until_complete(method(*args, **kwargs))

上面的定时任务管理器会每隔 1 小时计算所有用户的统计数据,其启动方式如下。

python -u -m weiguan.manage schedule run

还可指定某个具体的任务来立即运行一次,以方便测试。

python -u -m weiguan.manage schedule run stat_user

部署

整个项目需要部署的服务包括应用服务、定时任务服务、MySQL 服务和 Redis 服务,如果采用传统方式,部署需要不少时间,特别是安装和配置 MySQL。但现在我们无需如此,使用 Docker 可以很快地部署好这些服务。

首先构建应用镜像。应用服务和定时任务可以共用一个镜像,默认启动应用服务,定时任务可以在启动容器时修改启动命令来运行。Docker 镜像构建文件为 Dockerfile

Dockerfile

FROM python:3.7

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["python", "-u", "-m", "weiguan.app"]

这里我们先拷贝了 requirements.txt 文件,再拷贝剩余文件,而不是一次拷贝所有文件。这是为了利用 Docker 构建缓存,只要 requirements.txt 文件不发生变化,后面的 pip install 命令就不会重新执行,而是利用上次构建的缓存,这样可以大大节省构建时间。

在项目根目录下执行以下命令来构建镜像。

docker build -t weiguan.app/server .

然后使用 Docker Compose 来一次启动所有服务。Docker Compose 配置文件为 docker-compose.yml

version: '2'
services:
  server:
    image: weiguan.app/server
    environment:
      TZ: Asia/Shanghai
      WG_DATA_PATH: /data
      WG_DEBUG: 'false'
      WG_AUTO_RELOAD: 'false'
      WG_MYSQL_HOST: mysql
      WG_MYSQL_PORT: 3306
      WG_MYSQL_USER: weiguan
      WG_MYSQL_PASSWORD: jaggerwang
      WG_REDIS_URI: redis://@redis:6379/0
    volumes:
    - /data/weiguan/server:/data
  cron:
    image: weiguan.app/server
    command: python -u -m weiguan.manage schedule run
    environment:
      TZ: Asia/Shanghai
      WG_DATA_PATH: /data
      WG_DEBUG: 'false'
      WG_AUTO_RELOAD: 'false'
      WG_MYSQL_HOST: mysql
      WG_MYSQL_PORT: 3306
      WG_MYSQL_USER: weiguan
      WG_MYSQL_PASSWORD: jaggerwang
      WG_REDIS_URI: redis://@redis:6379/0
    volumes:
    - /data/weiguan/cron:/data
  mysql:
    image: mysql:5.7
    environment:
      TZ: Asia/Shanghai
      MYSQL_ROOT_PASSWORD: jaggerwang
    volumes:
    - /data/weiguan/mysql:/var/lib/mysql
  redis:
    image: redis:5
    environment:
      TZ: Asia/Shanghai
    volumes:
    - /data/weiguan/redis:/data
    

在项目根目录下执行以下命令来启动所有服务。

docker-compose -p weiguan up

如果需要停止所有服务,可以执行 docker-compose -p weiguan down。Docker 会为项目自动创建一个网络,并且把项目里的所有容器都加入这个网络,因此项目的各个容器之间可以互相访问。网络名字默认为当前所在目录的名字,可以通过 -p--project-name 选项来修改。更详细的部署说明可查看 GitHub 仓库里的 README 文档。

异步 IO 不是万能的

没有什么技术是万能的,异步 IO 同样如此。虽然 asyncio 已经大大简化了异步 IO 编程,但开发者仍然需要在每个发生 IO 的地方使用 await 关键字来显式让出 CPU。对于 IO 密集型应用,代码里将到处充斥着 async/await 关键字。另外,虽然可以同时存在多个协程,但某一时刻只能有一个协程被运行,只能做到并发而不是并行,除非使用多进程(Python 的多线程有全局锁限制无法并行运行)。所有基于事件循环机制来调度协程的语言都有这个问题,包括 Node.js、PHP、Ruby 等。

如果是计算密集型的应用,建议使用像 Go 这样的真正支持并行执行的语言。Go 通过 goroutine 来支持并行执行,可以充分利用多核性能,并且编写代码时无需使用 async/await 这样的关键字。不过大多数 Web 服务都是 IO 密集型的,大量的计算会交给后端的服务,比如数据库,去完成,使用 Python 这样的动态解释型语言是可行的,并且开发效率比 Go 这样的静态编译型语言要高。

总之,没有最好的语言,只有最适合的语言。