Python使用ORM框架peewee操作MySQL数据库
操作mysql相关资料
安装插件
print("Hello peewee !")
# Python 进阶(二):Python使用ORM框架peewee操作MySQL数据库
# https://blog.csdn.net/qq_29864051/article/details/131359489
# pip install peewee
# 安装mysql数据库客户端
# pip install pymysql
目录结构
(text) (base) youtai@macdeMacBook-Pro peewee % tree
.
├── 1.peewee操作mysql.py
├── __init__.py # 空文件
├── database.py
├── post_put_delete_book.py
└── select_book.py
连接数据库
database.py
from peewee import *
# 启用 SQL 调试
import logging
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
# CREATE TABLE `book` (
# `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
# `title` varchar(255) DEFAULT NULL COMMENT '书名',
# `author` varchar(255) DEFAULT NULL COMMENT '作者',
# `price` float(10,2) DEFAULT NULL COMMENT '书名价格',
# `edition` int(11) DEFAULT NULL COMMENT '版次',
# PRIMARY KEY (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
# 创建MySQL数据库连接
db = MySQLDatabase('zero-sys', user='root', password='123456',host='localhost', port=3306)
class Book(Model):
id = AutoField(primary_key=True)
title = CharField()
author = CharField()
price = FloatField()
edition = IntegerField()
class Meta:
database = db
table_name = 'book'
def to_string(self):
# 返回 self 的字符串表示 如:book.id, book.title, book.author, book.price, book.edition
# return f"Book(id={self.id}, title={self.title}, author={self.author}, price={self.price}, edition={self.edition})"
# 返回 self 的 json对象
return {
"id": self.id,
"title": self.title,
"author": self.author,
"price": self.price,
"edition": self.edition
} # 字典形式
# # 连接数据库
# db.connect()
# 尝试连接数据库
try:
db.connect()
print("数据库连接成功========================")
except Exception as e:
print(f"连接数据库时发生错误: {e}")
# # # 创建表
# db.create_tables([Book])
# 删除表
# db.drop_tables([Book])
# 获取表
# tables = db.get_tables()
# print(tables)
操作数据库 post_put_delete_book
post_put_delete_book.py
# 引入数据库连接模块和Book映射类
from database import db, Book
# # 插入数据
book = Book(title="三体一", author="刘慈欣", price=59.5, edition=6)
res1 = book.save()
print(f"res1:{res1}, book.id:{book.id}")
book = Book(title="1984", author="乔治奥威尔", price=47.9, edition=2)
res2 =book.save()
print(f"res2:{res2}, book.id:{book.id}")
book = Book(title="万历十五年", author="黄仁宇", price=125.5, edition=3)
res3 = book.save()
print("res3:",res3, "book.id:", book.id)
## 更新数据
# book = Book.get_by_id(1)
# print("修改前:", book.price)
# book.price = 24.6
# book.save()
# print("修改后:", book.price)
# query = Book.update(price=Book.price + 1).where(Book.id == 1)
# query.execute()
# ('UPDATE `book` SET `price` = (`book`.`price` + %s) WHERE (`book`.`id` = %s)', [1.0, 1])
query = Book.update(price=(Book.price + (Book.price * .1)))
query.execute()
# ('UPDATE `book` SET `price` = (`book`.`price` + (`book`.`price` * %s))', [0.1])
# print("删除前数据:")
# books = Book.select()
# for book in books:
# print(book.id, book.title, book.author, book.price, book.edition)
# book = Book.get(Book.title == '1984')
# book.delete_instance()
# # 使用where条件删除数据
# Book.delete().where(Book.edition == 3).execute()
# print("删除后数据:")
# books = Book.select()
# for book in books:
# print(book.id, book.title, book.author, book.price, book.edition)
查询mysql
select_book.py
# 引入数据库连接模块和Book映射类
from database import db, Book
from peewee import *
# 查询指定ID数据 查询单条数据
def select_book_by_id(book_id):
print("[查询指定ID数据]--------------------")
book = Book.get(Book.id == book_id)
print("ID查询 id:", book.id)
print("ID查询1:", book.to_string())
# 查询所有数据
def select_book_all():
print("[查询所有数据]--------------------")
bookquery = Book.select()
print(f"query all: {bookquery}")
books = bookquery.execute()
for book in books:
print("全部查询:", book.to_string())
# 执行sql语句 这里返回的是一个包含元组的列表
def execute_sql(sql):
print(f"execute sql: {sql}")
return db.execute_sql(sql)
# 执行sql语句 这里返回的是book模型对象
def execute_sql_return_model(sql):
print(f"execute sql: {sql}")
cursor = db.execute_sql(sql)
# 获取查询结果并转换为 Book 模型对象
books = []
for row in cursor.fetchall():
book = Book.select().where(Book.id == row[0]).get()
books.append(book)
return books
## 加()打印sql语句,并执行查询多行
def select_book_by_id_and_title(book_id, book_title):
print("[查询指定ID和名称数据]--------------------")
# books1 = (Book.select().where(Book.id == book_id).where(Book.title == book_title))
q = Book.select()
if book_id:
q = q.where(Book.id == book_id)
if book_title:
# q = q.where(Book.title == book_title)
q = q.where(Book.title.startswith(book_title)) ## `title` LIKE '三体%'
# q = q.where(Book.title.endswith(book_title)) ## `title` LIKE '%三体'
# q = q.where(Book.title.contains(book_title)) ## `title` LIKE '%三体%
count =q.count()
# print(f"query count by id and title: {count}")
print(f"query by id and title: {q}")
bookList = q
return bookList,count
## 加()打印sql语句,并执行查询多行
def select_book_by_price(price):
books = (Book.select().where(Book.price < price))
print(f"query by price: {books}")
for book in books:
print("价格小于100:", book.to_string())
## 聚合查询 count(id),sum(price)
def select_book_select_count():
# count Books.price
books = (Book.select(
fn.COUNT(Book.id).alias('count_id'),
fn.SUM(Book.price).alias('sum_price'),
))
print(f"query by price count: {books}")
# query by price count: SELECT COUNT(`t1`.`id`) AS `count_id`, SUM(`t1`.`price`) AS `sum_price` FROM `book` AS `t1`
for book in books:
print(f"count_id:{book.count_id} sum_price:{book.sum_price}")
# ('# count_id:8 sum_price:605.99
# select by price 排序查询
def select_book_by_price_desc():
books = (Book.select().order_by(Book.price.desc()))
print(f"query by price asc: {books}")
for book in books:
print("价格升序排序:", book.to_string())
# 限制返回结果数量
def select_book_by_limit(limit_num):
books = (Book.select().limit(limit_num))
print(f"query by limit: {books}")
for book in books:
print(f"限制返回{limit_num}条结果:", book.to_string())
# 限制返回结果数量
def select_book_offset_limit(page,page_size):
offset = (page-1)*page_size
books = (Book.select().offset(offset).limit(page_size))
print(f"query by limit: {books}")
for book in books:
print(f"分页查询:", book.to_string())
# 限制返回结果数量
def select_book_paginate(page,page_size):
# 跳过前面page-1页,每页page_size条 如:page=3,page_size=3 跳过前面2页,每页3条 即:offset=6,limit=3
books = (Book.select().paginate(page,page_size))
print(f"query by limit: {books}")
for book in books:
print(f"分页查询:", book.to_string())
# 限制返回结果数量
def select_book_list(author,price,page=1,page_size=5):
q = Book.select()
if author:
q = q.where(Book.author == author)
if price:
q = q.where(Book.price < price )
conut = (q.count())
# print(f"query count: {conut}")
offset = (page-1) * page_size
booksquery = (q.offset(offset).limit(page_size))
print(f"query by limit: {booksquery}")
return booksquery,conut
# select_book_by_id(1)
# select_book_all()
# select_book_by_price(100)
# select_book_select_count()
# select_book_by_price_desc()
# select_book_by_limit(2)
# select_book_offset_limit(2,2)
# select_book_paginate(3,3)
# bookList,count = select_book_list('乔治奥威尔',100)
# print(f"count---: {count}")
# for book in bookList:
# print(f"分页查询:", book.to_string())
# bookList,count = select_book_by_id_and_title(1,"三体一")
# print(f"count---: {count}")
# for book in bookList:
# print(f"title:{ book.title}")
# print(f"book:{ book.to_string()}")
# 执行sql语句 这里返回的是一个包含元组的列表
# querySql = "SELECT * FROM book WHERE author = '刘慈欣' AND price < 100"
# bookList = execute_sql(querySql)
# for book in bookList:
# print(f"book type:{type(book)} book:{ book}")
# print(f"book author: author:{ book[1]}")
querySql = "SELECT * FROM book WHERE author = '刘慈欣' AND price < 100"
bookList = execute_sql_return_model(querySql)
for book in bookList:
print(f"book type:{type(book)} book:{ book.to_string()}")
print(f"book author: author:{ book.author}")
print("")
print("")
# 聚合查询 count(id),sum(price) 返回元组列表
# def select_book_tuples():
# books = (Book
# .select(Book.id, Book.title, fn.Count(Book.id).alias('countId'))
# .group_by(Book.id)
# .tuples())
# # iterate over a list of 2-tuples containing the url and count
# for id, book_title,countId in books:
# print(f"book_id: {id}, book_title: {book_title}, countId: {countId}")
# # book_id: 4, book_title: 三体一, countId: 1
# # book_id: 5, book_title: 1984, countId: 1
# select_book_tuples()
# 聚合查询 count(id),sum(price) 返回字典列表
def select_book_dicts():
books = (Book
.select(Book.id, Book.title, fn.Count(Book.id).alias('countId'))
.group_by(Book.id)
.dicts())
# iterate over a list of 2-tuples containing the url and count
for book in books:
print(f"book_id: {book['id']}, book_title: {book['title']}, countId: {book['countId']}")
# book_id: 4, book_title: 三体一, countId: 1
# book_id: 5, book_title: 1984, countId: 1
select_book_dicts()