python 封装 mysql 自定义类

学习笔记 马富天 2020-03-30 16:18:44 16 0

【摘要】这段时间需要经常使用 python 来操作 mysql 进行数据处理工作,由于之前一直没有对此进行整理,导致每次遇到对数据库的操作都是重新写,没有一套固定的代码,不够规范,为了能够方便以后的工作,能够快速上手,所以本文对此记录,仅供个人参考使用!另外,非常欢迎大家能够提出建议,使得代码更加完善!

完整代码如下:

  1. # -*- coding: utf-8 -*-
  2.  
  3. __author__ = "mafutian"
  4. __time__ = "2020-03-27"
  5.  
  6. import sys
  7. import pymysql
  8.  
  9. class myDb():
  10.     objs = dict()
  11.  
  12.     def __new__(cls,db_env = ''):
  13.         if db_env not in myDb.objs.keys():
  14.             obj = object.__new__(cls)
  15.             myDb.objs[db_env] = obj
  16.         else:
  17.             obj = myDb.objs[db_env]
  18.         return obj
  19.  
  20.      
  21.     def __init__(self,db_env = '',db_charset = 'utf8mb4',db_port = 3306):
  22.          
  23.         self.__conn = None
  24.         self.__cursor = None
  25.          
  26.         dev_config = {
  27.             'db_host':'127.0.0.1',
  28.             'db_username':'root',
  29.             'db_password':'root',
  30.             'db_name':'test',
  31.             'db_charset':db_charset,
  32.             'db_port':db_port,
  33.         }
  34.  
  35.         test_config = {
  36.             'db_host':'',
  37.             'db_username':'',
  38.             'db_password':'',
  39.             'db_name':'',
  40.             'db_charset':db_charset,
  41.             'db_port':db_port,
  42.         }
  43.  
  44.         pro_config = {
  45.             'db_host':'',
  46.             'db_username':'',
  47.             'db_password':'',
  48.             'db_name':'',
  49.             'db_charset':db_charset,
  50.             'db_port':db_port,
  51.         }
  52.  
  53.         if db_env == 'pro_db':
  54.             db_config = pro_config
  55.         elif db_env == 'test_db':
  56.             db_config = test_config
  57.         elif db_env == 'dev_db':
  58.             db_config = dev_config
  59.         else:
  60.             self.error_msg('database connect failed!')
  61.         try:
  62.             self.__conn = pymysql.connect(
  63.                 host = db_config['db_host'], 
  64.                 user = db_config['db_username'], 
  65.                 passwd = db_config['db_password'], 
  66.                 db = db_config['db_name'], 
  67.                 port = db_config['db_port'], 
  68.                 charset = db_config['db_charset'],
  69.             )
  70.         except Exception as e:
  71.             self.error_msg('database connect failed:',e)
  72.         else:
  73.             self.__cursor = self.__conn.cursor(cursor = pymysql.cursors.DictCursor)
  74.  
  75.     def __del__(self):
  76.         try:
  77.             self.__cursor.close()
  78.             self.__conn.close()
  79.         except:
  80.             pass
  81.      
  82.     def error_msg(self,msg = '',e = ''):
  83.         sys.exit(msg + str(e))
  84.  
  85.     def query(self,sql,values = []):
  86.         try:
  87.             if not values:
  88.                 self.__cursor.execute(sql)
  89.             else:
  90.                 self.__cursor.execute(sql,values)
  91.         except Exception as e:
  92.             self.error_msg('execute failed:',e)
  93.         res = self.__cursor.fetchall()
  94.         return res
  95.  
  96.     def query_one(self,sql,values = []):
  97.         try:
  98.             if not values:
  99.                 self.__cursor.execute(sql)
  100.             else:
  101.                 self.__cursor.execute(sql,values)
  102.         except Exception as e:
  103.             self.error_msg('execute failed:',e)
  104.         res = self.__cursor.fetchone()
  105.         return res
  106.  
  107.     def select(self,table_name,columns = ['*'],condition = ''):
  108.         condition = ' where ' + condition if condition else None
  109.         if condition:
  110.             sql = "select %s from %s %s" % (','.join(columns),table_name,condition)
  111.         else:
  112.             sql = "select %s from %s" % (','.join(columns),table_name)
  113.         try:
  114.             self.__cursor.execute(sql)
  115.         except Exception as e:
  116.             self.error_msg('execute failed:',e)
  117.         return self.__cursor.fetchall()
  118.  
  119.     def insert(self,table_name,columns = (),val = (),auto_commit = False):
  120.         lens = len(columns)
  121.         sql = "INSERT INTO " + table_name + "(" + ','.join(columns) + ") VALUES " + "(" + ','.join(['%s'] * lens) + ")"
  122.         try:
  123.             self.__cursor.executemany(sql,val)
  124.             if auto_commit == True:
  125.                 self.commit()
  126.         except Exception as e:
  127.             self.error_msg('execute failed:',e)
  128.             self.rollback()
  129.         return self.rowcount()
  130.  
  131.     def insert_one(self,table_name = '',data = {},auto_commit = False):
  132.         lens = len(data)
  133.         placeholder = ['%s'] * lens
  134.         fields = []
  135.         values = []
  136.         for field,value in data.items():
  137.             fields.append(field)
  138.             values.append(value)
  139.          
  140.         sql = "INSERT INTO " + table_name + "(" + ','.join(fields) + ") VALUES (" + ','.join(placeholder) + ")"
  141.         try:
  142.             self.__cursor.execute(sql,values)
  143.             if auto_commit == True:
  144.                 self.commit()
  145.         except Exception as e:
  146.             print('execute failed:',e)
  147.         return self.rowcount()
  148.  
  149.     def execute(self,sql,values = [],auto_commit = False):
  150.         try:
  151.             if not values:
  152.                 self.__cursor.execute(sql)
  153.             else:
  154.                 self.__cursor.execute(sql,values)
  155.             if auto_commit == True:
  156.                 self.commit()
  157.         except Exception as e:
  158.             self.error_msg('execute failed:',e)
  159.         return self.rowcount()
  160.  
  161.     def rowcount(self):
  162.         return self.__cursor.rowcount
  163.  
  164.     def last_insert_id(self):
  165.         return self.__conn.insert_id()
  166.  
  167.     def commit(self):
  168.         self.__conn.commit()
  169.  
  170.     def rollback(self):
  171.         self.__conn.rollback()
  172.  
  173.     def close(self):
  174.         self.__del__()
  175.  
  176. if __name__ == '__main__':
  177.      
  178.     '''
  179.         用到的表:
  180.         CREATE TABLE `test` (
  181.         `id` int(11) NOT NULL AUTO_INCREMENT,
  182.         `name` varchar(255) DEFAULT NULL,
  183.         `age` int(11) DEFAULT NULL,
  184.         PRIMARY KEY (`id`)
  185.         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  186.     '''
  187.  
  188.     db = myDb(db_env = 'dev_db')
  189.     table_name = 'test'
  190.     columns = ('name','age')
  191.     val = (('mafutian',20),('zhangsan',18))
  192.     rowcount = db.insert(table_name = table_name,columns = columns,val = val)
  193.     db.commit()
  194.     if rowcount > 0:
  195.         print('insert success')
  196.  
  197.  
  198.     data = {
  199.         'name':'hello world',
  200.         'age':16,
  201.     }
  202.     rowcount = db.insert_one(table_name = table_name,data = data,auto_commit = True)
  203.     if rowcount > 0:
  204.         print('insert success')
  205.     res = db.select(table_name = table_name,columns = columns,condition = '')
  206.     # print(res)
  207.  
  208.  
  209.     sql = 'select id,name,age from test where id >= %s'
  210.     res = db.query_one(sql,['5'])
  211.     rowcount = db.rowcount()
  212.     if rowcount > 0:
  213.         print('find success,total is ',rowcount)
  214.     # print(res)
  215.  
  216.  
  217.     res = db.query(sql,['2'])
  218.     rowcount = db.rowcount()
  219.     if rowcount > 0:
  220.         print('find success,total is ',rowcount)
  221.     # print(res)
  222.      
  223.     sql = 'update test set name = %s where id = %s'
  224.     rowcount = db.execute(sql,['python',1])
  225.     db.commit()
  226.     if rowcount > 0:
  227.         print('data change,update success')
  228.  
  229.  
  230.     sql = 'delete from test where id = 2'
  231.     rowcount = db.execute(sql)
  232.     db.commit()
  233.     if rowcount > 0:
  234.         print('data change,delete success')

代码没有写太多注释,因为主要是给我自己整理、使用的,所以没有写注释啦,但是相信大家都能够看懂,所以期待大家能够给出建议啊~

版权归 马富天个人博客 所有

本文标题:《python 封装 mysql 自定义类》

本文链接地址:http://www.mafutian.com/442.html

转载请务必注明出处,小生将不胜感激,谢谢! 喜欢本文或觉得本文对您有帮助,请分享给您的朋友 ^_^

0

0

上一篇《 python3 邮件发送(多收件人,多抄送人、多附件) 》 下一篇《 MySQL 系统变量介绍——全局变量和会话变量 》

暂无评论

评论审核未开启
表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情
验证码

TOP10

  • 浏览最多
  • 评论最多