7.1 为什么需要

我们已经详细讨论了 ACID 的所有属性、它们的用途和用例。如您所见,并非所有数据库都提供 ACID 保证,为了更好的性能而牺牲它们。因此,很可能在您的项目中选择了不提供 ACID 的数据库,并且您可能需要在应用程序端实现一些必要的 ACID 功能。而且,如果您的系统被设计为微服务或某种其他类型的分布式应用程序,那么一个服务中的正常本地事务现在将变成分布式事务——当然,将失去其 ACID 特性,即使数据库每个单独的微服务都是 ACID。

我不想为您提供有关如何创建事务管理器的详尽指南,只是因为它太大太复杂,我只想介绍一些基本技术。如果我们不是在谈论分布式应用程序,那么如果您需要 ACID 保证,我认为没有理由尝试在应用程序端完全实现 ACID - 毕竟,采用现成的解决方案在任何意义上都会更容易和更便宜(即,具有 ACID 的数据库)。

但我想向您展示一些可以帮助您在应用程序端进行交易的技术。毕竟,了解这些技术可以在各种场景中为您提供帮助,甚至是那些不一定涉及事务的场景,并使您成为更好的开发人员(我希望如此)。

7.2 交易爱好者的基本工具

乐观和悲观阻塞。这是对某些可以同时访问的数据的两种类型的锁。

乐天派假设并发访问的可能性不是很大,因此它执行以下操作:读取所需的行,记住其版本号(或时间戳,或校验和/哈希 - 如果您无法更改数据模式并为版本添加列或时间戳),并且在将此数据的更改写入数据库之前,它会检查此数据的版本是否已更改。如果版本已更改,则您需要以某种方式解决创建的冲突并更新数据(“提交”),或回滚事务(“回滚”)。这种方法的缺点是为长名称“time-of-check to time-of-use”的bug创造了有利条件,缩写为TOCTOU:状态可能在check和write之间的时间段发生变化。我没有乐观锁定的经验,

例如,我从开发人员的日常生活中发现了一种使用乐观锁定之类的技术——这就是 HTTP 协议。对初始 HTTP GET 请求的响应可以包括用于来自客户端的后续 PUT 请求的 ETag 标头,客户端可以在 If-Match 标头中使用它。对于 GET 和 HEAD 方法,服务器只有在与它知道的 ETag 之一匹配时才会发回请求的资源。对于 PUT 和其他不安全的方法,它也只会在这种情况下加载资源。如果您不知道 ETag 的工作原理,这里有一个使用“feedparser”库(帮助解析 RSS 和其他提要)的好例子。


>>> import feedparser 
>>> d = feedparser.parse('http://feedparser.org/docs/examples/atom10.xml') 
>>> d.etag 
'"6c132-941-ad7e3080"' 
>>> d2 = feedparser.parse('http://feedparser.org/docs/examples/atom10.xml', etag=d.etag) 
>>> d2.feed 
{} 
>>> d2.debug_message 
'The feed has not changed since you last checked, so the server sent no data.  This is a feature, not a bug!' 

另一方面,悲观主义者的出发点是交易经常会在相同的数据上“相遇”,为了简化他的生活并避免不必要的竞争条件,他只是简单地阻止了他需要的数据为了实现锁定机制,您需要为您的会话维护一个数据库连接(而不是从池中拉取连接——在这种情况下您很可能必须使用乐观锁定),或者为事务使用一个 ID ,无论连接如何都可以使用。悲观锁定的缺点是它的使用通常会减慢事务的处理速度,但您可以对数据保持冷静并获得真正的隔离。

然而,另一个危险潜伏在可能的死锁中,其中多个进程等待彼此锁定的资源。例如,一个事务需要资源A和B,进程1已经占用了资源A,进程2已经占用了资源B,这两个进程都无法继续执行。有多种方法可以解决这个问题——我现在不想详述,所以先阅读维基百科,但简而言之,有创建锁层次结构的可能性。如果你想更详细地了解这个概念,那么请你绞尽脑汁研究“Dinning Philosophers Problem”(“哲学家用餐问题”)。

是一个很好的例子,说明两个锁在同一场景中的行为方式。

关于锁的实现。我不想细说,但是有分布式系统的锁管理器,例如:ZooKeeper、Redis、etcd、Consul。

7.3 操作的幂等性

幂等代码通常是一种很好的做法,这正是开发人员能够做到这一点的最佳做法,无论他是否使用事务。幂等性是操作的属性,当该操作再次应用于对象时会产生相同的结果。该函数被调用 - 给出了结果。一秒或五秒后再次调用 - 给出相同的结果。当然,如果数据库中的数据发生了变化,结果就不一样了。第三方系统中的数据可能不依赖于函数,但任何依赖于函数的东西都必须是可预测的。

幂等性可以有多种表现形式。其中之一只是关于如何编写代码的建议。你还记得最好的函数是只做一件事的函数吗?为这个功能编写单元测试有什么好处呢?如果你遵守这两条规则,那么你已经增加了你的函数是幂等的机会。为避免混淆,我将澄清幂等函数不一定是“纯”的(在“函数纯度”的意义上)。纯函数是那些只对它们在输入端接收到的数据进行操作的函数,不会以任何方式更改它们并返回处理后的结果。这些函数允许您使用函数式编程技术扩展您的应用程序。由于我们正在谈论一些通用数据和数据库,因此我们的功能不太可能是纯粹的,

这是一个纯函数:


def square(num: int) -> int: 
	return num * num 

但是这个函数并不纯粹,而是幂等的(请不要从这些片段中得出我是如何写代码的结论):


def insert_data(insert_query: str, db_connection: DbConnectionType) -> int: 
  db_connection.execute(insert_query) 
  return True 

不用多说,我可以谈谈我是如何被迫学习如何编写幂等程序的。我用 AWS 做了很多工作,正如你现在所看到的,还有一项名为 AWS Lambda 的服务。Lambda 允许您无需照管服务器,而只需加载将响应某些事件或根据时间表运行的代码。事件可以是由消息代理传送的消息。在 AWS 中,这个代理是 AWS SNS。我认为即使对于那些不使用 AWS 的人来说,这也应该很清楚:我们有一个通过通道(“主题”)发送消息的代理,订阅这些通道的微服务接收消息并以某种方式对它们做出反应。

问题是 SNS 传递消息“至少一次”(“at-least-once delivery”)。这是什么意思?您的 Lambda 代码迟早会被调用两次。它确实发生了。有很多场景需要你的函数是幂等的:例如,当从一个账户中取款时,我们可以预期某人取款两次相同的金额,但我们需要确保这真的是 2 个独立的时间 -换句话说,这是两个不同的交易,而不是一个的重复。

作为改变,我将举另一个例子——限制对 API 的请求频率(“速率限制”)。我们的 Lambda 收到一个带有特定 user_id 的事件,应该检查该 ID 的用户是否已经用尽了他对我们某些 API 的可能请求数。我们可以将调用的值从 AWS 存储在 DynamoDB 中,并在每次调用我们的函数时将其增加 1。

但是如果这个 Lambda 函数被同一个事件调用两次怎么办?对了,你有没有注意lambda_handler()函数的参数。第二个参数,AWS Lambda 中的上下文是默认给定的,包含各种元数据,包括为每个唯一调用生成的 request_id。这意味着现在,我们可以存储一个 request_id 列表,而不是在表中存储调用次数,并且在每次调用时,我们的 Lambda 将检查给定的请求是否已被处理:

import json
import os
from typing import Any, Dict

from aws_lambda_powertools.utilities.typing import LambdaContext  # needed only for argument type annotation
import boto3

limit = os.getenv('LIMIT')

def handler_name(event: Dict[str: Any], context: LambdaContext):

	request_id = context.aws_request_id

	# We find user_id in incoming event
	user_id = event["user_id"]

	# Our table for DynamoDB
	table = boto3.resource('dynamodb').Table('my_table')

	# Doing update
	table.update_item(
    	Key={'pkey': user_id},
    	UpdateExpression='ADD requests :request_id',
    	ConditionExpression='attribute_not_exists (requests) OR (size(requests) < :limit AND NOT contains(requests, :request_id))',
    	ExpressionAttributeValues={
        	':request_id': {'S': request_id},
        	':requests': {'SS': [request_id]},
        	':limit': {'N': limit}
    	}
	)

	# TODO: write further logic

	return {
    	"statusCode": 200,
    	"headers": {
        	"Content-Type": "application/json"
    	},
    	"body": json.dumps({
        	"status ": "success"
    	})
	}

由于我的示例实际上是从 Internet 上获取的,因此我会留下原始来源的链接,尤其是因为它提供了更多信息。

还记得我在上面提到过类似唯一事务 ID 的东西可以用来锁定共享数据吗?我们现在了解到它也可以用来使操作幂等。让我们找出您可以自己生成此类 ID 的方法。