一、介绍
数据库事务是实现业务逻辑的 SQL 命令链。例如,在电子商务应用程序中,填写客户订单所需的 SQL 命令可能会影响 、 和 表。数据库事务解决了原子性原则,该原则指出事务在数据库中应具有全有或全无影响。如果事务中的任何 SQL 命令失败,数据库应删除(回滚)整个事务。PostgreSQL 是最受欢迎的数据库服务器之一,它支持事务以消除部分数据库更新的可能性。sales_orders
sales_order_products
sales_payments
本指南向您展示如何使用 PostgreSQL 实现 PostgreSQL 事务,这是一个用于连接到 PostgreSQL 服务器的高级 Python 库。psycopg2
二、准备工作
要完成本指南,请执行以下操作:
- 部署 Ubuntu 20.04 服务器。
- 创建一个非根
sudo
用户。 - 预配托管的 PostgreSQL 数据库群集。
- 找到 PostgreSQL 数据库集群的连接详细信息,位于“概述”选项卡下。本指南使用以下示例连接详细信息:
- 用户名:
vultradmin
- 密码:
EXAMPLE_POSTGRESQL_PASSWORD
- 主持人:
SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com
- 端口:
16751
- 用户名:
三、设置示例数据库
此示例数据库是存储客户及其贷款余额的银行应用程序的后端。
此应用程序使用两个表来完成数据库事务。该表存储客户的名称。然后,该表存储客户的贷款余额。稍后,本指南将介绍如何使用 Linux 命令将示例事务发送到应用程序。应用程序必须将事务作为单个工作单元完成,才能完成业务逻辑。否则,数据库应拒绝部分完成的事务。customers
loans
curl
若要设置此示例应用程序,需要包连接到托管 PostgreSQL 数据库群集并创建数据库。按照以下步骤安装软件包并初始化数据库:postgresql-client
- 更新服务器的软件包信息索引。
$ sudo apt update
- 使用该工具安装包。
apt
postgresql-client
$ sudo apt install -y postgresql-client
- 登录到托管的 PostgreSQL 数据库集群。替换为适用于您的数据库的正确版本。
SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com
host
$ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb
输出:
Password for user vultradmin:
- 输入托管 PostgreSQL 数据库集群的密码,然后按继续。ENTER
输出:
defaultdb=>
- 发出以下 SQL 命令以创建示例数据库。
bank_db
defaultdb=> CREATE DATABASE bank_db;
输出:
CREATE DATABASE
- 连接到新数据库。
bank_db
defaultdb=> \c bank_db;
输出:
... You are now connected to database "bank_db" as user "vultradmin".
- 创建示例表。此表存储 、 和 。该关键字指示 PostgreSQL 服务器自动生成新的。
customers
customer_ids
first_names
last_names
SERIAL
customer_ids
bank_db=> CREATE TABLE customers ( customer_id SERIAL PRIMARY KEY, first_name VARCHAR(50), last_name VARCHAR(50) );
输出:
CREATE TABLE
- 创建表。此表存储客户持有的贷款帐户余额。此表中的列链接回表中的同一列。
loans
customer_id
customers
bank_db=> CREATE TABLE loans ( loan_id SERIAL PRIMARY KEY, customer_id BIGINT, amount DECIMAL(17, 4) );
输出:
CREATE TABLE
- 从托管的 PostgreSQL 数据库集群注销。
bank_db=> \q
- 按照下一步创建一个数据库类来访问您的示例 PostgreSQL 数据库。
四、 创建自定义的 PostgreSQL 数据库类
设置示例数据库后,现在需要一个连接到数据库的中心类来存储表中的数据。按照以下步骤创建类:
- 首先创建一个新目录,将源代码与系统文件分开。
project
$ mkdir project
- 导航到新目录。
project
$ cd project
- 在文本编辑器上打开一个新文件。
postgresql_db.py
$ nano postgresql_db.py
- 在文件中输入以下信息。请记住将数据库凭据(、、和)替换为 PostgreSQL 数据库集群的正确值。
postgresql_db.py
db_host
db_user
db_pass
db_port
import psycopg2 class PostgresqlDb: def __init__(self): db_host = 'SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com' db_name = 'bank_db' db_user = 'vultradmin' db_pass = 'EXAMPLE_POSTGRESQL_PASSWORD' db_port = 16751 self.db_conn = psycopg2.connect(host = db_host, database = db_name, user = db_user, password = db_pass, port = db_port) def execute_db(self, json_payload): try: print("Starting new database transaction...") self.db_conn.autocommit = False self.cur = self.db_conn.cursor() print("Inserting new customer to database...") sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id' self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name'])) customer_id = self.cur.fetchone()[0] print("Customer successfully inserted to database, new customer_id is " + str(customer_id)) print("Inserting customer's loan record...") sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id' self.cur.execute(sql_string, (customer_id, json_payload['loan_amount'])) loan_id = self.cur.fetchone()[0] print("Customer loan record inserted successfully, new loan_id is " + str(loan_id)) self.db_conn.commit() print("Database transaction completed successfully.") return "Success" except (Exception, psycopg2.DatabaseError) as error: print("Database transaction failed, rolling back database changes...") self.db_conn.rollback() return str(error) finally: if self.db_conn: self.cur.close() self.db_conn.close() print("Database connection closed successfully.")
- 保存并关闭文件。
postgresql_db.py
postgresql_db.py
文件解释道:
- 该语句从 Python 代码加载连接 PostgreSQL 数据库集群的适配器。
import psycopg2
psycopg2
- 该文件包含一个具有两个方法的类。
postgresql_db.py
PostgresqlDb
import psycopg2 class PostgresqlDb: def __init__(self): ... def execute_db(self, json_payload): ...
- 该方法是一个构造函数,每次从类创建新对象时都会触发。
__init__(...)
PostgresqlDb
- 该方法从包含客户名称和贷款余额的 HTTP 方法获取 JSON 有效负载,并将请求转发到 PostgreSQL 数据库。
execute_db(self, json_payload)
POST
- 在该方法下,您将 PostgreSQL 参数设置为 。此指令允许您使用命令永久提交成功的事务或命令来阻止部分事务。
execute_db(...)
autocommit
False
commit()
rollback()
... try: print("Starting new database transaction...") self.db_conn.autocommit = False self.cur = self.db_conn.cursor() ...
- 仅当数据库事务中没有错误时,才会触发以下代码块。在事务下,应用程序在表中创建新记录,在表中创建另一条记录。
customers
loans
... print("Inserting new customer to database...") sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id' self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name'])) customer_id = self.cur.fetchone()[0] print("Customer successfully inserted to database, new customer_id is " + str(customer_id)) print("Inserting customer's loan record...") sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id' self.cur.execute(sql_string, (customer_id, json_payload['loan_amount'])) loan_id = self.cur.fetchone()[0] print("Customer loan record inserted successfully, new loan_id is " + str(loan_id)) self.db_conn.commit() print("Database transaction completed successfully.") return "Success" ...
- 当交易失败并出现异常时,该块将触发。然后,块在每种情况下执行以关闭游标和数据库连接。
except(...)
finally
... except (Exception, psycopg2.DatabaseError) as error: print("Database transaction failed, rolling back database changes...") self.db_conn.rollback() return str(error) finally: if self.db_conn: self.cur.close() self.db_conn.close() print("Database connection closed successfully.")
该课程现已准备就绪。使用以下语法将其包含在其他 Python 源代码文件中。PostgresqlDb
import postgresql_db
pg = postgresql_db.PostgresqlDb()
resp = pg.execute_db(...)
按照下一步为 Python 应用程序创建文件。main.py
五、创建应用程序的入口点
要完成此示例应用程序,您需要一个接受端口 上的传入请求的 HTTP 服务器。Python 有一些内置库,可用于执行任务。按照以下步骤创建 HTTP 服务器:POST
8080
- 在文本编辑器上打开一个新文件。
main.py
$ nano main.py
- 在文件中输入以下信息。
main.py
import http.server from http import HTTPStatus import socketserver import json import postgresql_db class httpHandler(http.server.SimpleHTTPRequestHandler): def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) json_payload = json.loads(post_data) self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() pg = postgresql_db.PostgresqlDb() resp = pg.execute_db(json_payload) self.wfile.write(bytes( resp + '\r\n', "utf8")) httpServer = socketserver.TCPServer(('', 8080), httpHandler) print("Web server started at port 8080") try: httpServer.serve_forever() except KeyboardInterrupt: httpServer.server_close() print("The HTTP server is stopped.")
- 保存并关闭文件。
main.py
main.py
文件解释说:
- 本节加载示例应用程序所需的所有 Python 库。、 和库加载 HTTP 功能。该模块允许您在加载自定义 PostgreSQL 数据库类时使用 JSON 数据。
import
http.server
HTTPStatus
socketserver
json
postgresql_db
import http.server from http import HTTPStatus import socketserver import json import postgresql_db ...
- 是 HTTP 服务器的处理程序类。此类接受来自 HTTP 客户端的 JSON 有效负载。然后在此类下,and 语句调用自定义类以将数据保存到数据库,并使用该语句返回响应。
httpHandler
pg = postgresql_db.PostgresqlDb()
pg.execute_db(json_payload)
PostgresqlDb
self.wfile.write(bytes( resp + '\r\n', "utf8"))
... class httpHandler(http.server.SimpleHTTPRequestHandler): def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) json_payload = json.loads(post_data) self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() pg = postgresql_db.PostgresqlDb() resp = pg.execute_db(json_payload) self.wfile.write(bytes( resp + '\r\n', "utf8")) ...
- 文件末尾的以下声明创建一个 Web 服务器,该服务器侦听 HTTP 请求并将请求调度到类。
httpHandler
... httpServer = socketserver.TCPServer(('', 8080), httpHandler) print("Web server started at port 8080") try: httpServer.serve_forever() except KeyboardInterrupt: httpServer.server_close() print("The HTTP server is stopped.")
现在,您拥有应用程序所需的所有必要的源代码文件。继续执行下一步以测试应用程序。
六、 测试应用程序
对所有 Python 文件进行编码后,最后一步是安装 Python 包、下载库并测试应用程序。请按照以下步骤完成申请:pip
psycopg2
- 安装 Python 包。
pip
$ sudo apt install -y python3-pip
- 使用该软件包为 PostgreSQL 服务器安装库。
pip
psycopg2-binary
$ pip install psycopg2-binary
输出:
... Installing collected packages: psycopg2-binary Successfully installed psycopg2-binary-2.9.5
- 使用该命令运行应用程序。
python3
$ python3 main.py
输出:
Web server started at port 8080
- 建立与服务器的另一个连接,并运行以下 Linux 命令,将示例 JSON 有效负载发送到应用程序。
SSH
curl
$ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "4560"}'
输出:
"Success"
- 从运行 Web 服务器的第一个终端窗口查看下面的输出。事务成功,没有任何错误。
Web server started at port 8080 Starting new database transaction... Inserting new customer to database... Customer successfully inserted to database, new customer_id is 1 Inserting customer's loan record... Customer loan record inserted successfully, new loan_id is 1 Database transaction completed successfully. Database connection closed successfully.
- 尝试发送以下具有错误贷款金额的无效交易。而不是数值。
PP
$ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "PP"}'
输出:
"invalid input syntax for type numeric: \"PP\"..."
- 检查第一个终端窗口的输出。这一次,事务将失败,而不对数据库进行任何更改。尽管应用程序将客户的详细信息插入数据库并获取新的 (),但整个事务将根据以下输出回滚。
customer_id
2
.. Starting new database transaction... Inserting new customer to database... Customer successfully inserted to database, new customer_id is 2 Inserting customer's loan record... Database transaction failed, rolling back database changes... Database connection closed successfully.
- 要验证更改,请登录到 PostgreSQL 数据库集群。
$ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb
输出:
Password for user vultradmin:
- 输入您的密码,然后按继续。ENTER
输出:
defaultdb=>
- 切换到数据库。
bank_db
defaultdb=> \c bank_db;
输出:
You are now connected to database "bank_db" as user "vultradmin".
- 查询表。
customers
defaultdb=> SELECT customer_id, first_name, last_name FROM customers;
输出:
customer_id | first_name | last_name -------------+------------+----------- 1 | JOHN | DOE (1 row)
- 查询表。
loans
defaultdb=> SELECT loan_id, customer_id, amount FROM loans;
输出:
loan_id | customer_id | amount ---------+-------------+----------- 1 | 1 | 4560.0000 (1 row)
上述输出确认应用程序的逻辑按预期工作。如果没有 PostgreSQL 事务逻辑,您现在应该有一个孤立的客户记录,而没有匹配的贷款记录。
七、结论
本指南向您展示如何在 Ubuntu 20.04 服务器上使用 Python 实现 PostgreSQL 数据库事务。使用本指南中的源代码创建将数据库工作单元视为一个整体的应用程序。事务可确保数据库一致性并防止可能出现的孤立记录情况。