项目后端的人两个月成效不大,耐不住性子我也加入学习了Flask一些基础,以下进行简要记录
if __name__ == '__main__':
app.run(debug=True)
@app.route('/up_photo', methods=['POST'], strict_slashes=False)
def func():
==============
#add more
==============
return jsonify({...})
Flask的blueprint机制
在app.py加入:
app = Flask(name)
app.register_blueprint(user, url_prefix=’/user’)
注:主页里是【@app.route(‘/xxx’, methods=[‘POST’], strict_slashes=False)】这样
在对应api的py文件里加入:
导入flask模块
from flask import Blueprint, Flask, request
创建类的实例
user = Blueprint(‘user’, name)
注:api里是【@user.route(‘/xxx’, methods=[‘POST’])】这样
对api进行访问,比如上述例子:127.0.0.1:5000/user/xxx
Flask的数据库访问
建立连接
from DBUtils.PersistentDB import PersistentDB导入数据库模块
import pymysql
创建数据库连接池
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0, # ping MySQL服务端,检查是否服务可用。
closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host=’127.0.0.1’,
port=3306,
user=’root’,
password=’123456’,
database=’test’,
charset=’utf8’
)
执行函数
def func(sql):
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
封装Sql
#!/usr/bin/python3-- coding: utf-8 --
from mySQL_config import func
定义用户表类
class UserTB:
‘’’ 用户增加、查询 ‘’’
def __init__(self, user, pwd):
self.user = user
self.pwd = pwd
# 查询用户名
def selectUser(self):
sql = "select * from user where USERNAME = '%s'" % self.user
result = func(sql)
return result
# 查询用户名密码
def selectUserPwd(self):
sql = "select * from user where USERNAME = '%s' and PASSWORD = '%s' limit 1" % (self.user, self.pwd)
result = func(sql)
return result
# 插入
def insetinto(self):
sql = "INSERT INTO user (USERNAME,PASSWORD) VALUES ('%s','%s')" % (self.user, self.pwd)
# print('user',self.user)
# print('pwd',self.pwd)
result = func(sql)
# print('result',result)
return result
使用
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# 导入flask模块
from flask import Blueprint, Flask, request
# 导入json模块
import json
# 导入自定义模块
from Utils.utils import *
# 导入mySQL_config
from sql.user import UserTB
import base64
# 创建类的实例
user = Blueprint('user', __name__)
# 登录
# @param:{string} user 用户名
# @param:{string} pwd 密码
# @returns:{json}
#确定服务器ip地址
host_ip='192.168.1.105'
# 登入逻辑
@user.route('/login', methods=['POST'])
def login():
if request.method == 'POST':
# POST、GET:
# request.form获得所有post参数放在一个类似dict类中,to_dict()是字典化
# 单个参数可以通过request.form.to_dict().get("xxx","")获得
param = request.form.to_dict()
if param.get("username") != None and param.get("password") != None:
if param.get("username") != "" and len(param.get("username")) < 8 and param.get("password") != "" and len(
param.get("password")) < 20:
user = UserTB(param.get("username"), param.get("password"))
user = user.selectUser()
if user == ():
content = json.dumps(formatres(False, {}, "用户名不存在"), ensure_ascii=False)
else:
userPwd = UserTB(param.get("username"), param.get("password"))
userPwd = userPwd.selectUserPwd()
if userPwd == ():
content = json.dumps(formatres(False, {}, "用户名密码不正确"), ensure_ascii=False)
else:
content = json.dumps(formatres(True, {}, "登录成功"), ensure_ascii=False)
else:
content = json.dumps(formatres(False, {}, "用户名密码为空或者过长"), ensure_ascii=False)
else:
content = json.dumps(formatres(False, {}, "请检查参数"), ensure_ascii=False)
else:
content = json.dumps(formatres(False, {}, "101"))
resp = Response_headers(content)
return content
# 注册逻辑
@user.route('/register', methods=['POST'])
def register():
if request.method == 'POST':
param = request.form.to_dict()
if param.get("username") != None and param.get("password") != None and param.get("password2") != None:
if param.get("password") == param.get("password2"):
if param.get("username") != "" and len(param.get("username")) < 8 and param.get("password") != "" and len(
param.get("password")) < 20:
user = UserTB(param.get("username"), param.get("password"))
user = user.selectUser()
if user != ():
content = json.dumps(formatres(False, {}, "用户名已存在"), ensure_ascii=False)
else:
try:
insetintouser = UserTB(param.get("username"), param.get("password"))
insetintouser = insetintouser.insetinto()
if insetintouser == ():
content = json.dumps(formatres(True, {}, "用户创建成功"), ensure_ascii=False)
else:
content = json.dumps(formatres(False, {}, "用户创建失败"), ensure_ascii=False)
except:
content = json.dumps(formatres(False, {}, "用户创建失败,可能用户名过长"), ensure_ascii=False)
else:
content = json.dumps(formatres(False, {}, "用户名密码为空或者过长"), ensure_ascii=False)
else:
content = json.dumps(formatres(False, {}, "两次输入密码不一致"), ensure_ascii=False)
else:
content = json.dumps(formatres(False, {}, "请检查参数"), ensure_ascii=False)
else:
content = json.dumps(formatres(False, {}, "101"))
resp = Response_headers(content)
return content