Sanic 是一个 Python 异步 IO Web 框架和 Server,使用了 AsyncIO 来异步处理 HTTP 请求。由于使用了异步 IO,用 Sanic 编写的 Web 服务可以支持非常高的并发。除了使用异步 IO,Sanic 在使用感受上非常类似于 Flask 框架,但其并发性能约为 Flask 的十倍。

本文对 Python AsyncIO 和 Sanic 框架做了入门介绍,如想全面和深入学习相关知识,可报名学习课程 叽歪课堂 - Python Sanic 高并发服务开发实战

技术基础

AsyncIO

Python 3.4 开始引入 asyncio 模块,使得 Python 也支持异步 IO。3.5 版本里添加了 async/await 关键字,使得异步 IO 代码编写更加方便。3.6 和 3.7 版本继续进行了完善,截止到目前最新的 3.7 版本,asyncio 模块已经非常成熟和稳定。

AsyncIO 引入了协程(coroutine)的概念。协程可以看做是用户态的线程,它由应用本身来调度运行,而不是系统内核。协程的开销远低于线程,普通的机器上都可以运行成千上万的协程。

下面的代码把 main 函数定义为了一个协程:

import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

相比普通的函数定义,前面多了一个 async 关键字。有了这个关键字,就可以在函数内部使用 await 关键字,await 用来等待其它的协程运行完成。与普通的 main 函数不同,如果我们执行它,将得到一个协程对象,函数体并没有被运行。注意协程这个词有两种含义,一种是协程定义,另一种是创建完成的协程对象。

那么如何执行协程代码了?可以使用 asyncio.run 函数,它会创建一个 loop,然后把协程注册到这个 loop 里去运行,待协程运行结束后再关闭 loop。loop 的作用是调度协程运行,一个 loop 里可以注册很多协程。当某个协程需要等待外部 IO 完成时会主动挂起(通过 await 表明)自己,让出 CPU 给其它等待运行的协程。待外部 IO 完成时,loop 会收到事件通知,原先被挂起的协程将变成待运行状态,等待其它协程让出 CPU 后运行。如果有多个协程等待运行,那么 loop 会采用某种调度算法从中选择一个来运行。

asyncio.run(main())

更多有关协程的介绍和使用,可以参阅 Python 官方文档 asyncio

Sanic Web 框架

随着 Python asyncio 模块的推出,出现了一些支持异步 IO 的 Web 框架,其中最受欢迎的就是 Sanic,其 GitHub Star 数目前已经过万。Sanic 最新版本要求使用 Python 3.6+ 版本,其底层使用了性能更好的 uvloop 来替换 asyncio 默认的事件循环。uvloop 是 Cython 写的,构建于 libuv 之上,相比其它事件循环,速度差不多要快一倍,几乎接近于 Go 的速度。Sanic 在使用体验上非常类似于 Flask 框架,比如请求对象和响应对象的属性和方法、请求处理器、路由、蓝图、异常处理等,熟悉 Flask 框架的同学可以无缝切入。

异步访问后端服务

使用异步 IO 的好处是为了在某个任务执行 IO(读写磁盘和网络数据)操作的时候能够把 CPU 释放出来给其它任务,而不是让 CPU 空闲等待,这样可以大大提升 CPU 的使用率。对于需要大量读写数据库和缓存等后端服务的应用,仅仅在读写请求数据的时候为异步还不够,读写后端服务数据同样需要为异步,否则 CPU 在等待后端服务处理时仍然会被空闲。可以使用 AIOMySQLAIORedis 这样的异步 IO 库来分别访问 MySQL 和 Redis 服务。

另外本项目还使用了 Fire 框架来编写命令行应用,以及 APScheduler 任务管理器来定时运行异步任务。

开发实战

本项目要开发的是 围观 APP 的后端 API 服务的精简版,本文仅摘取部分示例,完整代码可从 GitHub 仓库 Sanic in Practice 获取。

目录结构

为了便于理解代码,首先来看一下完整的项目目录结构。

.
├── Dockerfile # Dokcer 镜像构建文件
├── README.md
├── docker-compose.yml # Docker Compose 配置文件
├── requirements.txt # PIP 依赖包
├── venv # Python 虚拟环境
└── weiguan # Python 顶层包
    ├── cli # 命令行应用
    │   ├── app.py # 应用入口
    │   └── commands # 子命令
    ├── config # 配置
    ├── container.py # IoC 容器
    ├── dependencies # 外部依赖,包括数据库、缓存等内部服务,以及第三方服务
    ├── entities # 业务实体
    ├── services # 业务逻辑
    ├── utils # 工具类函数和类
    └── web # Web 应用
        ├── app.py # 应用入口
        └── blueprints # Sanic 蓝图

本项目代码采用了干净架构,以避免核心业务逻辑跟外部依赖紧耦合,这样能够保证业务逻辑的稳定性和可测试性。如果对干净架构不太了解的,可先阅读此文 干净架构在 Web 服务开发中的实践。另外为了集中管理应用内对象的创建及其依赖关系,还使用了依赖注入框架 Dependency Injector 来实现了 IoC(控制反转)思想。IoC 能够简化复杂应用里的业务对象管理,也有利于代码维护。如想了解更多有关控制反转和依赖注入的内容,可阅读此文 使用 IoC 容器来简化业务对象的管理

创建虚拟环境

为了避免同一主机不同项目之间出现 PIP 包版本冲突,最好为每个项目创建一个独立的 Python 环境。Python 3.3+ 自带的 venv 模块就是用来管理虚拟环境的,项目的虚拟环境可以创建在项目根目录下。由于虚拟环境是跟本地 Python 绑定的,所以不能提交到 Git 仓库,需要把它加入到 .gitignore 文件里。

进入项目根目录,执行下面的命令来创建一个名为 venv 的虚拟环境。

python3 -m venv venv

定义业务实体

业务实体在干净架构中处于最内层,所有其它外层都可以依赖它。由于 Python 是一门弱类型语言,开发效率优先,很多时候大家都直接使用 dict 来表示数据对象,因此不需要额外去定义业务实体类型。本项目中在业务实体里定义了一些枚举类型和从数据库查询出来的对象的序列化器,以及 WebSocket 消息类型等。

从数据库查询出来的对象,如果要通过 API 响应给客户端,需要进行序列化。序列化通常都采用 JSON 格式,不过并不是所有的 Python 类型都支持 JSON 序列化,比如 datetime 类型,即便支持可能默认的格式也不是我们想要的。有时我们还希望对序列化后的 JSON 对象属性进行裁剪和转换,比如删除某些需要保密的字段。使用 Marshmallow 这个库,可以允许我们自由定义序列化后的 JSON 对象格式,同时还能避免对外输出对象的结构跟内部对象的结构紧耦合。下面是用户序列化器的定义。

weiguan/entities/user.py

from marshmallow import Schema, fields


class UserSchema(Schema):
    id = fields.Integer()
    username = fields.String()
    mobile = fields.String()
    email = fields.String()
    avatarId = fields.Integer(attribute='avatar_id')
    intro = fields.String()
    createdAt = fields.DateTime(attribute='created_at')
    updatedAt = fields.DateTime(attribute='updated_at')

    avatar = fields.Nested('FileSchema')

定义表仓库

表仓库(Repository)负责将数据对象持久化保存到数据库的表中,以及从表中加载数据对象出来,其中不包含任何业务逻辑。由于其负责跟外部数据库服务打交道,因此将其放在 dependencies 目录下。我们使用 AIOMySQL 来访问 MySQL 数据库,它基于 PyMySQL,提供了类似的 API,同时它还支持 SQLAlchemy。使用 SQLAlchemy 可以避免裸写 SQL,方便的同时还能避免 SQL 注入攻击,因此我们将使用 SQLAlchemy 方式来访问数据库。下面是用户表仓库定义。

weiguan/dependencies/repositories/user.py

import sqlalchemy as sa
import sqlalchemy.sql as sasql
from aiomysql.sa import Engine

from ...utils import LocalDateTime
from .common import metadata, Repository

user_table = sa.Table(
    'user', metadata,
    sa.Column('id', sa.Integer, nullable=False, primary_key=True,
              comment='ID'),
    sa.Column('username', sa.VARCHAR(20), nullable=False, comment='用户名'),
    sa.Column('password', sa.CHAR(64), nullable=False, comment='已加密的密码'),
    sa.Column('salt', sa.CHAR(64), nullable=False, comment='密钥'),
    sa.Column('mobile', sa.CHAR(11), nullable=True, comment='手机号'),
    sa.Column('email', sa.VARCHAR(50), nullable=True, comment='邮箱'),
    sa.Column('avatar_id', sa.Integer, nullable=True, comment='头像文件 ID'),
    sa.Column('intro', sa.VARCHAR(100), nullable=False, server_default='',
              comment='自我介绍'),
    sa.Column('created_at', LocalDateTime, nullable=False,
              server_default=sasql.text('CURRENT_TIMESTAMP'),
              comment='创建时间'),
    sa.Column('updated_at', LocalDateTime, nullable=False,
              server_default=sasql.text(
                  'CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'),
              comment='更新时间'),
    sa.Index('idx_username', 'username', unique=True),
    sa.Index('idx_mobile', 'mobile', unique=True),
    sa.ForeignKeyConstraint(['avatar_id'], ['file.id'], ondelete='SET NULL',
                            onupdate='CASCADE'),
    comment='用户',
)


class UserRepo(Repository):
    def __init__(self, db: Engine):
        super().__init__(db, user_table)

其中 user_table 为 SQLAlchemy 的表对象,我们使用它来定义表结构,以及构建 SQL 语句。每个表对象都有一个对应的 Repo 类,封装了对该表的一些操作。为了避免代码冗余,定义了 Repository 这个表仓库基类,它提供了增删改查等基本操作。外层应尽量使用可控的 Repo 类来访问表,避免直接使用底层的表对象。Repository 基类代码如下。

weiguan/dependencies/repositories/common.py

import sqlalchemy as sa
import sqlalchemy.sql as sasql
from aiomysql.sa import Engine, SAConnection

metadata = sa.MetaData()


class Repository:
    def __init__(self, db: Engine, table: sa.Table):
        self.db = db
        self.table = table

    @property
    def conn(self):
        return self.db.acquire()

    async def execute(self, sm):
        conn: SAConnection
        async with self.conn as conn:
            result = await conn.execute(sm)

        return result

    async def create(self, **data):
        result = await self.execute(sasql.insert(self.table).values(**data))
        id = result.lastrowid

        return await self.info(id)

    async def delete(self, column_value, column_name='id'):
        result = await self.execute(
            sasql.delete(self.table)
            .where(self.table.c[column_name] == column_value))

        return result.rowcount

    async def modify(self, column_value, column_name='id', **data):
        data = {k: v for k, v in data.items() if v is not None}

        await self.execute(
            sasql.update(self.table)
            .where(self.table.c[column_name] == column_value)
            .values(**data))

        return await self.info(column_value, column_name)

    async def info(self, column_value, column_name='id'):
        if column_value is None:
            return None

        result = await self.execute(
            self.table.select()
            .where(self.table.c[column_name] == column_value))
        row = await result.first()

        return None if row is None else dict(row)

    async def infos(self, column_values, column_name='id'):
        valid_values = [v for v in column_values if v is not None]
        if valid_values:
            result = await self.execute(
                self.table.select()
                .where(self.table.c[column_name].in_(valid_values)))
            d = {v[column_name]: dict(v) for v in await result.fetchall()}
        else:
            d = {}

        return [d.get(v) for v in column_values]

    async def list(self, *, from_=None, where=None, order_by=None, limit=None,
                   offset=None):
        select_sm = self.table.select()
        count_sm = sasql.select([sasql.func.count()]).select_from(self.table)

        if from_ is not None:
            select_sm = select_sm.select_from(from_)
            count_sm = count_sm.select_from(from_)

        if where is not None:
            select_sm = select_sm.where(where)
            count_sm = count_sm.where(where)

        if order_by is not None:
            select_sm = select_sm.order_by(order_by)

        if limit is not None:
            select_sm = select_sm.limit(limit)
        if offset is not None:
            select_sm = select_sm.offset(offset)

        result = await self.execute(select_sm)
        rows = [dict(v) for v in await result.fetchall()]

        result = await self.execute(count_sm)
        total = await result.scalar()

        return (rows, total)

    async def count(self, where=None):
        sm = sasql.select([sasql.func.count()]).select_from(self.table)
        if where is not None:
            sm = sm.where(where)
        result = await self.execute(sm)

        return await result.scalar()

编写表模型管理命令

有了表模型(仓库)之后,就可以利用它来在数据库里创建表,而不用再去编写建表的 SQL,这也是使用 SQLAlchemy 的好处之一。可以打开 Python 的交互式命令行来执行建表语句,但在建表之前需要导入一些依赖包,并执行一些初始化工作,比如加载应用配置、创建数据库连接等。这种方式适合只执行一次的临时操作,如果一个操作要重复执行多次,最好将它实现为一条命令。

得益于 Python 强大的动态性,有许多功能强大的 Python 命令行应用框架,比如 Google 出品的 Fire。使用它,只需一行代码就可以把一个函数、一个类的所有方法或者一个模块的所有函数暴露为可在命令行执行的命令。我们将使用它来编写模型管理命令,其中包含建表的子命令,后面还会编写定时任务管理命令。Fire 支持多层级的命令结构,为了方便记忆和管理,项目的所有命令都通过 manage 这个入口模块来访问。manage 模块很简单,因为大部分的初始化工作都在 IoC 容器里完成了,包括加载配置、创建数据库连接池、创建服务对象等。

weiguan/cli/app.py

import asyncio

import fire
import uvloop

from ..config import config, log_config
from ..container import Container


async def init_container():
    container = Container(config, log_config)
    await container.on_init

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init_container())

    container = Container()
    fire.Fire(container.root_command)

    loop.run_until_complete(container.clean())

由于 Fire 不支持协程,但 IoC 容器的初始化是一个异步任务,所以需要先等待初始化任务执行完成,再执行 Fire 的初始化。首先调用 asyncio.get_event_loop 获取到当前线程的 loop(如果没有会自动创建一个),然后调用其 run_until_complete 方法来运行协程并等待其运行结束。初始化完成之后,就可以调用 fire.Fire 方法来将 RooCommand 对象暴露为根命令。下面是模型管理子命令,定时任务子命令稍后会创建。

weiguan/cli/commands/model.py

import sqlalchemy as sa

from ...dependencies import metadata


class ModelCommand:
    def __init__(self, config: dict):
        self.config = config

        self.engine = sa.create_engine(
            'mysql://{}:{}@{}:{}/{}?charset=utf8mb4'.format(
                config['MYSQL_USER'], config['MYSQL_PASSWORD'],
                config['MYSQL_HOST'], config['MYSQL_PORT'],
                config['MYSQL_DB']))

    def create_tables(self, tables=None):
        if isinstance(tables, str):
            tables = [tables]

        metadata.create_all(self.engine, tables)

如果要创建所有表,可以执行如下命令,已经存在的表会忽略。

python -u -m weiguan.cli.app model create_tables

实现业务逻辑

业务逻辑代码是应用的核心,按功能模块进行划分,都放在 services 目录下。以用户模块为例,包含了注册、登录、修改、查询单个等接口。

weiguan/services/user.py

import string

import sqlalchemy.sql as sasql

from ..utils import random_string, sha256_hash
from ..dependencies import UserRepo, UserFollowRepo


class UserService:
    _mobile_verify_codes = {}
    _email_verify_codes = {}

    def __init__(self, config: dict, user_repo: UserRepo,
                 user_follow_repo: UserFollowRepo):
        self.config = config
        self.user_repo = user_repo
        self.user_follow_repo = user_follow_repo

    async def create_user(self, **data):
        data['salt'] = random_string(64)
        data['password'] = sha256_hash(data['password'], data['salt'])

        return await self.user_repo.create(**data)

    async def modify_user(self, id, **data):
        if data.get('password') is not None:
            user = self.info(id)
            data['password'] = sha256_hash(data['password'], user['salt'])

        return await self.user_repo.modify(id, **data)

    async def info(self, id):
        return await self.user_repo.info(id)

    async def info_by_username(self, username):
        return await self.user_repo.info(username, 'username')

    async def info_by_mobile(self, mobile):
        return await self.user_repo.info(mobile, 'mobile')

    async def infos(self, ids):
        return await self.user_repo.infos(ids)

    async def list(self, *, limit=None, offset=None):
        return await self.user_repo.list(limit=limit, offset=offset)

处理请求

考虑到以后接口可能会很多,为了代码的可维护性,有必要使用蓝图(blueprint)来按功能模块对接口进行分组。蓝图都放在 blueprints 目录下,一个蓝图下的接口拥有相同的路径前缀。以帐号蓝图为例,其下包含了登录、注册、退出等跟帐号管理相关的接口。

weiguan/web/blueprints/account.py

from sanic import Blueprint

from ...utils import sha256_hash
from ...container import Container
from ...entities import UserSchema
from .common import response_json, ResponseCode, authenticated, dump_user_info

account = Blueprint('account', url_prefix='/account')


@account.post('/register')
async def register(request):
    data = request.json
    username = data['username']
    password = data['password']

    user_service = Container().user_service
    user = await user_service.create_user(username=username, password=password)

    request['session']['user'] = await dump_user_info(user)

    return response_json(user=request['session']['user'])


@account.post('/login')
async def login(request):
    data = request.json
    username = data.get('username')
    mobile = data.get('mobile')
    password = data['password']

    user_service = Container().user_service
    if username is not None:
        user = await user_service.info_by_username(username)
    elif mobile is not None:
        user = await user_service.info_by_mobile(mobile)
    else:
        user = None

    if (user is None or
            sha256_hash(password, user['salt']) != user['password']):
        return response_json(ResponseCode.FAIL, '帐号或密码错误')

    request['session']['user'] = await dump_user_info(user)

    return response_json(user=request['session']['user'])


@account.get('/logout')
async def logout(request):
    user = request['session'].pop('user', None)

    return response_json(user=user)


@account.get('/info')
async def info(request):
    user = request['session'].get('user')

    return response_json(user=user)


@account.post('/modify')
@authenticated()
async def modify(request):
    user_id = request['session']['user']['id']

    data = request.json
    username = data.get('username')
    password = data.get('password')
    mobile = data.get('mobile')
    email = data.get('email')
    avatar_id = data.get('avatarId')
    intro = data.get('intro')
    code = data.get('code')

    user_service = Container().user_service

    if ((mobile is not None and
            not (await user_service.check_mobile_verify_code('modify', mobile, code))) or
            (email is not None and
             not (await user_service.check_email_verify_code('modify', email, code)))):
        return response_json(ResponseCode.FAIL, '验证码错误')

    user = await user_service.modify_user(
        user_id, username=username, password=password, mobile=mobile,
        email=email, avatar_id=avatar_id, intro=intro)

    request['session']['user'] = await dump_user_info(user)

    return response_json(user=request['session']['user'])

应用入口

所有组件都准备妥当之后,我们需要有一个入口模块来将这些组件集成起来,并启动我们的应用。

weiguan/web/app.py

import os

from sanic import Sanic
from sanic_session import Session, AIORedisSessionInterface

from ..config import config, log_config
from ..container import Container
from .blueprints import handle_exception, account, message, post, storage, user

os.makedirs(config['DATA_PATH'], 0o755, True)

app = Sanic(config['NAME'].capitalize(), log_config=log_config)
app.config.update(config)

app.error_handler.add(Exception, handle_exception)

app.static('/files', os.path.join(config['DATA_PATH'], config['UPLOAD_DIR']),
           stream_large_files=True)

app.blueprint(account)
app.blueprint(message)
app.blueprint(post)
app.blueprint(storage)
app.blueprint(user)


@app.listener('before_server_start')
async def server_init(app, loop):
    container = Container(config, log_config)
    await container.on_init

    Session(app, AIORedisSessionInterface(
        container.cache, expiry=config['SESSION_EXPIRY']))


@app.listener('after_server_stop')
async def server_clean(app, loop):
    await Container().clean()

if __name__ == '__main__':
    app.run(host=config['HOST'], port=config['PORT'], debug=config['DEBUG'],
            auto_reload=config['AUTO_RELOAD'], access_log=config['ACCESS_LOG'],
            workers=config['WORKERS'])

入口模块完成了以下这些工作:

  1. 加载应用配置
  2. 创建 Sanic 应用
  3. 设置全局异常处理
  4. 注册各个蓝图
  5. 初始化 IoC 容器
  6. 停止应用时清理资源
  7. 最后启动应用,等待请求到来

执行定时任务

有的时候需要在后台定时执行一些任务,比如发送订阅邮件、计算统计数据等。传统的方式是使用 Crontab,但这个需要修改操作系统配置,不方便管理。特别是现在的应用大多使用 Docker,甚至是 Serverless 方式来部署,服务器对它们来说是透明的。Python 已经有不少成熟的定时任务管理系统,但大多不支持异步任务,因此我们选择了支持异步任务运行的 APScheduler。同样我们需要编写一个管理命令来启动定时任务。

weiguan/cli/commands/schedule.py

from logging import Logger
import asyncio
from datetime import datetime, time, timedelta

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.events import EVENT_JOB_ERROR, JobExecutionEvent

from ...services import StatService


class ScheduleCommand:
    def __init__(self, config: dict, logger: Logger, stat_service: StatService):
        self.config = config
        self.logger = logger
        self.stat_service = stat_service

    async def stat_user(self, user_id=None):
        if user_id is None:
            await self.stat_service.stat_all_users()
        else:
            user_stat = await self.stat_service.stat_user(user_id)
            print(user_stat)

    async def stat_post(self, post_id=None):
        if post_id is None:
            await self.stat_service.stat_all_posts()
        else:
            post_stat = await self.stat_service.stat_post(post_id)
            print(post_stat)

    def error_listener(self, event):
        if isinstance(event, JobExecutionEvent):
            self.logger.error(repr(event.exception))
        else:
            self.logger.error(repr(event))

    def run(self, task=None, *args, **kwargs):
        if task is None:
            scheduler = AsyncIOScheduler()

            scheduler.add_job(self.stat_user, 'interval', minutes=1)
            scheduler.add_job(self.stat_post, 'interval', minutes=1)

            scheduler.add_listener(self.error_listener, EVENT_JOB_ERROR)

            scheduler.start()

            try:
                asyncio.get_event_loop().run_forever()
            except (KeyboardInterrupt, SystemExit):
                pass
        else:
            method = getattr(self, task)
            asyncio.get_event_loop().run_until_complete(method(*args, **kwargs))

上面的定时任务管理器会每隔 1 分钟计算所有用户和动态的统计数据,其启动方式如下。

python -u -m weiguan.cli.app schedule run

部署

整个项目需要部署的服务包括应用服务、定时任务服务、MySQL 服务和 Redis 服务,如果采用传统方式,部署需要不少时间,特别是安装和配置 MySQL。但现在我们无需如此,使用 Docker 可以很快地部署好这些服务。

首先构建应用镜像。应用服务和定时任务可以共用一个镜像,默认启动应用服务,定时任务可以在启动容器时修改启动命令来运行。

Dockerfile

FROM python:3.7

WORKDIR /app

COPY sources.list /etc/apt/
COPY pip.conf /etc/
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["python", "-u", "-m", "weiguan.web.app"]

这里我们先拷贝了 requirements.txt 文件,再拷贝剩余文件,而不是一次拷贝所有文件。这是为了充分利用 Docker 构建缓存,只要 requirements.txt 文件不发生变化,后面的 pip install 命令就不会重新执行,而是利用上次构建的缓存,这样可以大大节省构建时间。

在项目根目录下执行以下命令来构建镜像。

docker build -t weiguan .

然后使用 Docker Compose 来一次启动所有服务。

docker-compose.yml

version: '3'
services:
  server:
    image: weiguan
    environment:
      TZ: Asia/Shanghai
      WG_DATA_PATH: /data
      WG_DEBUG: 'false'
      WG_AUTO_RELOAD: 'false'
      WG_MYSQL_HOST: mysql
      WG_MYSQL_PORT: 3306
      WG_MYSQL_DB: weiguan
      WG_MYSQL_USER: weiguan
      WG_MYSQL_PASSWORD: jwcourse.com
      WG_REDIS_URI: redis://@redis:6379/0
    ports:
      - 8001:8000
    volumes:
      - ~/data/weiguan-demo/server:/data
  scheduler:
    image: weiguan
    command: python -u -m weiguan.cli.app schedule run
    environment:
      TZ: Asia/Shanghai
      WG_DATA_PATH: /data
      WG_DEBUG: 'false'
      WG_AUTO_RELOAD: 'false'
      WG_MYSQL_HOST: mysql
      WG_MYSQL_PORT: 3306
      WG_MYSQL_DB: weiguan
      WG_MYSQL_USER: weiguan
      WG_MYSQL_PASSWORD: jwcourse.com
      WG_REDIS_URI: redis://@redis:6379/0
    volumes:
      - ~/data/weiguan-demo/scheduler:/data
  mysql:
    image: mysql:5.7
    environment:
      TZ: Asia/Shanghai
      MYSQL_ROOT_PASSWORD: jwcourse.com
    volumes:
      - ~/data/weiguan-demo/mysql:/var/lib/mysql
  redis:
    image: redis:5
    environment:
      TZ: Asia/Shanghai
    volumes:
      - ~/data/weiguan-demo/redis:/data
            

在项目根目录下执行以下命令来启动所有服务。

docker-compose up

如果需要停止所有服务,可以执行 docker-compose down。Docker 会为项目自动创建一个网络,并且把项目里的所有容器都加入这个网络,因此项目的各个容器之间可以互相访问。

异步 IO 不是万能的

没有什么技术是万能的,异步 IO 同样如此。虽然 asyncio 已经大大简化了异步 IO 编程,但开发者仍然需要在每个发生 IO 的地方使用 await 关键字来显式让出 CPU。对于 IO 密集型应用,代码里将到处充斥着 async/await 关键字。另外,虽然可以同时存在多个协程,但某一时刻只能有一个协程被运行,只能做到并发而不是并行,除非使用多进程(Python 的多线程有全局锁限制无法并行运行)。所有基于事件循环机制来调度协程的语言都有这个问题,包括 Node.js、PHP、Ruby 等。

如果是计算密集型的应用,建议使用像 Go 这样的真正支持并行执行的语言。Go 通过 goroutine 来支持并行执行,可以充分利用多核性能,并且编写代码时无需使用 async/await 这样的关键字。不过大多数 Web 服务都是 IO 密集型的,大量的计算会交给后端的服务,比如数据库,去完成,使用 Python 这样的动态解释型语言是可行的,并且开发效率比 Go 这样的静态编译型语言要高。

总之,没有最好的语言,只有最适合的语言。

参考资料

  1. Sanic in Practice
  2. Sanic
  3. AsyncIO
  4. Uvloop
  5. SQLAlchemy
  6. AIOMySQL
  7. AIORedis
  8. Fire
  9. APScheduler
  10. Marshmallow
  11. Dependency Injector
  12. 干净架构在 Web 服务开发中的实践
  13. 使用依赖注入来简化业务对象管理