项目后端的人两个月成效不大,耐不住性子我也加入学习了Flask一些基础,以下进行简要记录
if __name__ == '__main__':
app.run(debug=True)
@app.route('/up_photo', methods=['POST'], strict_slashes=False)
def func():
==============
#add more
==============
return jsonify({...})
Flask的blueprint机制
在app.py加入:
app = Flask(name)
app.register_blueprint(user, url_prefix=’/user’)
注:主页里是【@app.route(‘/xxx’, methods=[‘POST’], strict_slashes=False)】这样
在对应api的py文件里加入:
导入flask模块
from flask import Blueprint, Flask, request
创建类的实例
user = Blueprint(‘user’, name)
注:api里是【@user.route(‘/xxx’, methods=[‘POST’])】这样
对api进行访问,比如上述例子:127.0.0.1:5000/user/xxx
Flask的数据库访问
建立连接
from DBUtils.PersistentDB import PersistentDB导入数据库模块
import pymysql
创建数据库连接池
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0, # ping MySQL服务端,检查是否服务可用。
closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host=’127.0.0.1’,
port=3306,
user=’root’,
password=’123456’,
database=’test’,
charset=’utf8’
)
执行函数
def func(sql):
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
封装Sql
#!/usr/bin/python3-- coding: utf-8 --
from mySQL_config import func
定义用户表类
class UserTB:
‘’’ 用户增加、查询 ‘’’
def __init__(self, user, pwd):
self.user = user
self.pwd = pwd
# 查询用户名
def selectUser(self):
sql = "select * from user where USERNAME = '%s'" % self.user
result = func(sql)
return result
# 查询用户名密码
def selectUserPwd(self):
sql = "select * from user where USERNAME = '%s' and PASSWORD = '%s' limit 1" % (self.user, self.pwd)
result = func(sql)
return result
# 插入
def insetinto(self):
sql = "INSERT INTO user (USERNAME,PASSWORD) VALUES ('%s','%s')" % (self.user, self.pwd)
# print('user',self.user)
# print('pwd',self.pwd)
result = func(sql)
# print('result',result)
return result
使用
#!/usr/bin/python3 # -*- coding: utf-8 -*- # 导入flask模块 from flask import Blueprint, Flask, request # 导入json模块 import json # 导入自定义模块 from Utils.utils import * # 导入mySQL_config from sql.user import UserTB import base64 # 创建类的实例 user = Blueprint('user', __name__) # 登录 # @param:{string} user 用户名 # @param:{string} pwd 密码 # @returns:{json} #确定服务器ip地址 host_ip='192.168.1.105' # 登入逻辑 @user.route('/login', methods=['POST']) def login(): if request.method == 'POST': # POST、GET: # request.form获得所有post参数放在一个类似dict类中,to_dict()是字典化 # 单个参数可以通过request.form.to_dict().get("xxx","")获得 param = request.form.to_dict() if param.get("username") != None and param.get("password") != None: if param.get("username") != "" and len(param.get("username")) < 8 and param.get("password") != "" and len( param.get("password")) < 20: user = UserTB(param.get("username"), param.get("password")) user = user.selectUser() if user == (): content = json.dumps(formatres(False, {}, "用户名不存在"), ensure_ascii=False) else: userPwd = UserTB(param.get("username"), param.get("password")) userPwd = userPwd.selectUserPwd() if userPwd == (): content = json.dumps(formatres(False, {}, "用户名密码不正确"), ensure_ascii=False) else: content = json.dumps(formatres(True, {}, "登录成功"), ensure_ascii=False) else: content = json.dumps(formatres(False, {}, "用户名密码为空或者过长"), ensure_ascii=False) else: content = json.dumps(formatres(False, {}, "请检查参数"), ensure_ascii=False) else: content = json.dumps(formatres(False, {}, "101")) resp = Response_headers(content) return content # 注册逻辑 @user.route('/register', methods=['POST']) def register(): if request.method == 'POST': param = request.form.to_dict() if param.get("username") != None and param.get("password") != None and param.get("password2") != None: if param.get("password") == param.get("password2"): if param.get("username") != "" and len(param.get("username")) < 8 and param.get("password") != "" and len( param.get("password")) < 20: user = UserTB(param.get("username"), param.get("password")) user = user.selectUser() if user != (): content = json.dumps(formatres(False, {}, "用户名已存在"), ensure_ascii=False) else: try: insetintouser = UserTB(param.get("username"), param.get("password")) insetintouser = insetintouser.insetinto() if insetintouser == (): content = json.dumps(formatres(True, {}, "用户创建成功"), ensure_ascii=False) else: content = json.dumps(formatres(False, {}, "用户创建失败"), ensure_ascii=False) except: content = json.dumps(formatres(False, {}, "用户创建失败,可能用户名过长"), ensure_ascii=False) else: content = json.dumps(formatres(False, {}, "用户名密码为空或者过长"), ensure_ascii=False) else: content = json.dumps(formatres(False, {}, "两次输入密码不一致"), ensure_ascii=False) else: content = json.dumps(formatres(False, {}, "请检查参数"), ensure_ascii=False) else: content = json.dumps(formatres(False, {}, "101")) resp = Response_headers(content) return content