iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >Python 操作MySql数据库(封装、优雅)
  • 887
分享到

Python 操作MySql数据库(封装、优雅)

数据库mysqlpython 2023-08-24 21:08:58 887人浏览 安东尼
摘要

python 记录操作Mysql数据库(封装)——优雅 前言封装代码进行测试结果展示 前言 学了pymysql第三方库(pip install pymysql)来操作MySql数据库后,浅记一下对MySql进行 《关于我的M

前言

学了pymysql第三方库(pip install pymysql)来操作MySql数据库后,浅记一下对MySql进行 《关于我的MySql之优雅封装这件事儿》。这里只涉及了单表的增、删、查、改这些简单地封装操作,便于对简单的sql语句进行快速使用。

首页,利用Navicat连接本地的MySql数据库来快速新建一个(Pythondemo)数据库,在里面快速创建两张空表(tb_test,tb_user)用于测试,其中主要使用tb_user表来进行数据测试,如下图进行tb_user的简单设计(将其中的id字段设为自动递增,便于在insert插入数据时少写一个字段。):


在这里插入图片描述


能实现简单的功能有:

  • 以元组类型快速insert插入数据
  • 通过id删除数据
  • 通过传入id快速查询返回一条数据,返回结果
  • 直接调用方法查询表中所有数据,返回结果
  • 指定查询返回多少条数数据,可根据简单条件查询(where 字段=”“),返回结果
  • 通过id修改数据
  • 可通过自定义sql语句执行

将以下代码可复制到一个py模块中即可运行(注意:要修改自己连接的数据库配置)!!!


封装代码


# -*- coding:utf-8 -*-"""@File : my_mysql.py@Time : 2022/9/3 17:32"""import pymysqlclass MySql:    def __init__(self, operate_tablename:str,my_sqldb_config_param:dict):        assert isinstance(my_sqldb_config_param,dict),"请以字典类型的格式传入!"        self._operate_tablename = operate_tablename        try:            self._conn = pymysql.connect(**my_sqldb_config_param) # 连接数据库,配置参数            self._cursor = self._conn.cursor() # 创建一个游标,用来执行查询            self._get_field() # 获取此表中的字段名        except Exception as e:            raise Exception(f"数据库连接失败!!!\n请检查表名、配置参数是否正确或检查本地数据库是否已启动!\n{e}")    # 获取_conn对象    @property    def get_connect(self):        return self._conn    # 获取_cursor对象    @property    def get_cursor(self):        return self._cursor    # 获取__desc对象    @property    def get_description(self):        # print(f"{self._operate_tablename}表中的字段属性:",self._desc)        return self._desc    # 获取正在操作的表名    @property    def operate_tablename(self):        return f"正在操作 {self._operate_tablename}表!!!"    # 修改要操作的表    @operate_tablename.setter    def operate_tablename(self,operate_tablename):        assert operate_tablename !="", "请输入要操作的表名!"        print(f"{self._operate_tablename} 表已被更换!")        self._operate_tablename = operate_tablename        self._get_field()    # 获取此表中的字段名    def _get_field(self):        self._cursor.execute(f"select * from {self._operate_tablename}")        self._desc = self._cursor.description        self._field_ = []        for field in self._desc:            self._field_.append(field[0])    # 执行sql语句    def _sql(self,sql,msg=""):        try:            self._cursor.execute(sql)  # 执行sql语句            self._conn.commit() # 执行sql语句后,进行提交            if msg:print(f"数据{msg}成功!")            return True        except Exception as e:            if msg:print(f"\033[31m数据{msg}失败!!!\n{e} \033[0m")            self._conn.rollback()  # 执行sql语句失败,进行回滚            return False    # 插入数据    def insert(self, *value):        if not isinstance(value[0],tuple): raise Exception("要求传入的参数类型为tuple元组!!!")        if len(value) == 1: value=value[0]        else:value = str(value)[1:-1]        sql = f"insert into {self._operate_tablename}({','.join(self._field_[1:])}) values {value}"        if not self._sql(sql,f"{value}插入"):            print("\n\033[31m:请检查每一条记录字段是否正确!!!\033[0m\n")    # 插入:自定义sql语句插入数据    def insert_by_sql(self, sql):        self._sql(sql,"插入")    # 删除:通过id删除该条数据    def delete_by_id(self,id_:int):        sql = f"delete from {self._operate_tablename} where id = {id_}"        if self._sql(sql):print(f"id={id_}记录,删除成功!")        else:print(f"\n\033[31m:id = {id_}记录,删除失败!!!\033[0m\n")    # 删除:自定义sql语句删除数据    def delete_by_sql(self, sql):        self._sql(sql,"删除")    # 修改:通过id修改数据    def update_by_id(self, id_:int, set_field:dict):        assert isinstance(set_field,dict),"请以字典类型的格式传入!"        tempset_field = []        for i in set_field:            tempset_field.append(f"{i}='{set_field[i]}'")        set_field = ",".join(tempset_field)        sql = f"update {self._operate_tablename} set {set_field} where id = {id_}"        if self._sql(sql):print(f"id={id_}记录,{set_field}修改成功!")        else:print(f"\n\033[31m:id = {id_}记录,{set_field}修改失败!!!\033[0m\n")    # 修改:自定义sql语句修改数据    def update_by_sql(self, sql):        self._sql(sql,"修改")    # 查询:通过id查询一条数据    def select_by_id(self,id_:int,field="*"):        if field != "*": field = ','.join(field)        sql = f"select {field} from {self._operate_tablename} where id={id_}"        self._cursor.execute(sql)        return self._cursor.fetchone()    # 查询:指定查询多少条数数据,可根据简单条件查询(where 字段=”“)    def select_many(self,num:int,query_builder=None,field="*"):        if field != "*": field = ','.join(field)        sql = f"select {field} from {self._operate_tablename}"        if query_builder:            if isinstance(query_builder,dict) and len(query_builder) == 1:                query_builder = list(query_builder.items())[0]                sql = f"select {field} from {self._operate_tablename} where {query_builder[0]}='{query_builder[1]}'"            else: raise Exception("要求输入的条件为dict(字典)类型并且只能有一对键值(:len(dict)=1)!!!")        self._cursor.execute(sql)        return self._cursor.fetchmany(num)    # 查询:所有数据    def select_all(self, field="*"):        if field != "*": field = ','.join(field)        sql = f"select {field} from {self._operate_tablename}"        self._cursor.execute(sql)        return self._cursor.fetchall()    # 查询:自定义sql语句查询数据    def select_by_sql(self, sql):        try:            self._cursor.execute(sql)            return self._cursor.fetchall()        except Exception as e:            print(f"\033[31m:数据查询失败!!!\n{e} \033[0m")    # 当对象被销毁时,游标先关闭,连接后关闭    def __del__(self):        self._cursor.close()        self._conn.close()        

进行测试


注意下面传入数据的格式!!!

def my_mysql_test(operate_tablename:str, my_sqldb_config_param:dict):    print("-------------my_mysql_test:注意下面传入数据的格式---------------")    # 创建自己的mysql连接对象,operate_tablename是要进行操作的表名,my_sqldb_config_param是pymysql连接本机MySQL所需的配置参数    mysql = MySql(operate_tablename=operate_tablename, my_sqldb_config_param=my_sqldb_config_param)    print(mysql.operate_tablename)    mysql.operate_tablename = "tb_user"   # 修改要操作的表    print(mysql.operate_tablename)    print("----------------insert-----------------")    # 自定义sql语句,插入多条数据,这里我们在创建表时,设置了id自动增加,所以这里不需要设置id字段    mysql.insert_by_sql('insert into tb_user(name,age,gender) values ("111", 12, "男"), ("222", 22, "女"),("333", 32, "女")')    mysql.insert(("444", 42, "男"))  # 插入一条数据    mysql.insert(("555",52,"男"),("666",62,"女"))  # 插入多条数据    # mysql.insert("777",72,"男") # 异常    # mysql.insert(["888",82,"女"]) # 异常    mysql.insert(("999", 92, "男"), ("000", '28', "女", "xxx"))  # 异常    print("----------------select-----------------")    result = mysql.select_by_sql("select * from tb_user where gender='男'")  # 自定义sql语句查询数据    print("查询:自定义sql查询数据:\n", result)    result = mysql.select_all() # 查询表中所有数据,返回表中所有数据    print("查询:表中所有数据:\n", result)    result = mysql.select_by_id(1) # 通过id查询,返回一条数据    print("\n查询:通过id:", result)    result = mysql.select_many(1, {"gender": "女"}) # 指定查询返回多少条数数据,可根据简单条件查询(where 字段=”“)    print('查询:指定查询多少条数数据,可根据简单条件查询(where 字段=”“):', result)    print("----------------delete-----------------")    mysql.delete_by_sql('delete from tb_user where gender="男"')  # 自定义sql语句删除数据    mysql.delete_by_id(4) # 通过id删除数据    result = mysql.select_all()    print("删除数据后查询表中所有数据:\n", result)    print("----------------update-----------------")    mysql.update_by_sql("update tb_user set name='update_name',gender='男' where id = 6")  # 自定义sql语句更新数据    mysql.update_by_id(3, {"age": "180"})   # 通过id更新数据    mysql.update_by_id(2, {"name": "update_name", "age": "999"})    mysql.update_by_id(6, {"xxx": "updateName", "yyy": "18"}) # 异常    # mysql.update_by_id(1, ("age","180"))  # 异常    result = mysql.select_all()    print("更新数据后查询表中所有数据:\n", result)if __name__ == '__main__':    my_sqldb_config_param = {        "host": "127.0.0.1", #连接主机的ip        "port": 3306,   #连接主机的端口        "user": "root", #本地数据库的用户名        "passWord": "***", #本地数据库的密码        "database": "pythondemo", #连接的数据库        "charset": "utf8" #设置编码格式    }    operate_tablename = "tb_test" #设置该数据库准备操作的表名    my_mysql_test(operate_tablename,my_sqldb_config_param)    

结果展示


控制台打印的结果:

在这里插入图片描述


数据库中的结果:

在这里插入图片描述


来源地址:https://blog.csdn.net/qq_59142194/article/details/126681062

您可能感兴趣的文档:

--结束END--

本文标题: Python 操作MySql数据库(封装、优雅)

本文链接: https://www.lsjlt.com/news/379740.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • oracle怎么查询当前用户所有的表
    要查询当前用户拥有的所有表,可以使用以下 sql 命令:select * from user_tables; 如何查询当前用户拥有的所有表 要查询当前用户拥有的所有表,可以使...
    99+
    2024-05-14
    oracle
  • oracle怎么备份表中数据
    oracle 表数据备份的方法包括:导出数据 (exp):将表数据导出到外部文件。导入数据 (imp):将导出文件中的数据导入表中。用户管理的备份 (umr):允许用户控制备份和恢复过程...
    99+
    2024-05-14
    oracle
  • oracle怎么做到数据实时备份
    oracle 实时备份通过持续保持数据库和事务日志的副本来实现数据保护,提供快速恢复。实现机制主要包括归档重做日志和 asm 卷管理系统。它最小化数据丢失、加快恢复时间、消除手动备份任务...
    99+
    2024-05-14
    oracle 数据丢失
  • oracle怎么查询所有的表空间
    要查询 oracle 中的所有表空间,可以使用 sql 语句 "select tablespace_name from dba_tablespaces",其中 dba_tabl...
    99+
    2024-05-14
    oracle
  • oracle怎么创建新用户并赋予权限设置
    答案:要创建 oracle 新用户,请执行以下步骤:以具有 create user 权限的用户身份登录;在 sql*plus 窗口中输入 create user identified ...
    99+
    2024-05-14
    oracle
  • oracle怎么建立新用户
    在 oracle 数据库中创建用户的方法:使用 sql*plus 连接数据库;使用 create user 语法创建新用户;根据用户需要授予权限;注销并重新登录以使更改生效。 如何在 ...
    99+
    2024-05-14
    oracle
  • oracle怎么创建新用户并赋予权限密码
    本教程详细介绍了如何使用 oracle 创建一个新用户并授予其权限:创建新用户并设置密码。授予对特定表的读写权限。授予创建序列的权限。根据需要授予其他权限。 如何使用 Oracle 创...
    99+
    2024-05-14
    oracle
  • oracle怎么查询时间段内的数据记录表
    在 oracle 数据库中查询指定时间段内的数据记录表,可以使用 between 操作符,用于比较日期或时间的范围。语法:select * from table_name wh...
    99+
    2024-05-14
    oracle
  • oracle怎么查看表的分区
    问题:如何查看 oracle 表的分区?步骤:查询数据字典视图 all_tab_partitions,指定表名。结果显示分区名称、上边界值和下边界值。 如何查看 Oracle 表的分区...
    99+
    2024-05-14
    oracle
  • oracle怎么导入dump文件
    要导入 dump 文件,请先停止 oracle 服务,然后使用 impdp 命令。步骤包括:停止 oracle 数据库服务。导航到 oracle 数据泵工具目录。使用 impdp 命令导...
    99+
    2024-05-14
    oracle
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作