Flask_Basic

项目后端的人两个月成效不大,耐不住性子我也加入学习了Flask一些基础,以下进行简要记录


  • Flask的运行

if __name__ == '__main__':
    app.run(debug=True)
  • Flask的基本运作

@app.route('/up_photo', methods=['POST'], strict_slashes=False)
    def func():
        ==============
        #add more
        ==============
        return jsonify({...})
  • Flask的blueprint机制

    在app.py加入:


    app = Flask(name)
    app.register_blueprint(user, url_prefix=’/user’)
    注:主页里是【@app.route(‘/xxx’, methods=[‘POST’], strict_slashes=False)】这样

    在对应api的py文件里加入:

    导入flask模块

    from flask import Blueprint, Flask, request

    创建类的实例

    user = Blueprint(‘user’, name)
    注:api里是【@user.route(‘/xxx’, methods=[‘POST’])】这样

    对api进行访问,比如上述例子:

    127.0.0.1:5000/user/xxx

  • Flask的数据库访问

    建立连接


    from DBUtils.PersistentDB import PersistentDB

    导入数据库模块

    import pymysql

    创建数据库连接池

    POOL = PersistentDB(
    creator=pymysql, # 使用链接数据库的模块
    maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[], # 开始会话前执行的命令列表。
    ping=0, # ping MySQL服务端,检查是否服务可用。
    closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host=’127.0.0.1’,
    port=3306,
    user=’root’,
    password=’123456’,
    database=’test’,
    charset=’utf8’
    )

    执行函数


    def func(sql):
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    cursor.close()
    conn.close()
    return result

    封装Sql

    #!/usr/bin/python3

    -- coding: utf-8 --

from mySQL_config import func

定义用户表类

class UserTB:
‘’’ 用户增加、查询 ‘’’

def __init__(self, user, pwd):
    self.user = user
    self.pwd = pwd

# 查询用户名
def selectUser(self):
    sql = "select * from user where USERNAME = '%s'" % self.user
    result = func(sql)
    return result

# 查询用户名密码
def selectUserPwd(self):
    sql = "select * from user where USERNAME = '%s' and PASSWORD = '%s' limit 1" % (self.user, self.pwd)
    result = func(sql)
    return result

# 插入
def insetinto(self):
    sql = "INSERT INTO user (USERNAME,PASSWORD) VALUES ('%s','%s')" % (self.user, self.pwd)
    # print('user',self.user)
    # print('pwd',self.pwd)
    result = func(sql)
    # print('result',result)
    return result


使用

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# 导入flask模块
from flask import Blueprint, Flask, request
# 导入json模块
import json
# 导入自定义模块
from Utils.utils import *
# 导入mySQL_config
from sql.user import UserTB
import base64

# 创建类的实例
user = Blueprint('user', __name__)

# 登录
# @param:{string}     user         用户名
# @param:{string}     pwd          密码
# @returns:{json}

#确定服务器ip地址
host_ip='192.168.1.105'

# 登入逻辑
@user.route('/login', methods=['POST'])
def login():
    if request.method == 'POST':
        # POST、GET:
        # request.form获得所有post参数放在一个类似dict类中,to_dict()是字典化
        # 单个参数可以通过request.form.to_dict().get("xxx","")获得
        param = request.form.to_dict()
        if param.get("username") != None and param.get("password") != None:
            if param.get("username") != "" and len(param.get("username")) < 8 and param.get("password") != "" and len(
                    param.get("password")) < 20:
                user = UserTB(param.get("username"), param.get("password"))
                user = user.selectUser()
                if user == ():
                    content = json.dumps(formatres(False, {}, "用户名不存在"), ensure_ascii=False)
                else:
                    userPwd = UserTB(param.get("username"), param.get("password"))
                    userPwd = userPwd.selectUserPwd()
                    if userPwd == ():
                        content = json.dumps(formatres(False, {}, "用户名密码不正确"), ensure_ascii=False)
                    else:
                        content = json.dumps(formatres(True, {}, "登录成功"), ensure_ascii=False)
            else:
                content = json.dumps(formatres(False, {}, "用户名密码为空或者过长"), ensure_ascii=False)
        else:
            content = json.dumps(formatres(False, {}, "请检查参数"), ensure_ascii=False)
    else:
        content = json.dumps(formatres(False, {}, "101"))
    resp = Response_headers(content)
    return content




# 注册逻辑
@user.route('/register', methods=['POST'])
def register():
    if request.method == 'POST':
        param = request.form.to_dict()
        if param.get("username") != None and param.get("password") != None and param.get("password2") != None:
            if param.get("password") == param.get("password2"):
                if param.get("username") != "" and len(param.get("username")) < 8 and param.get("password") != "" and len(
                        param.get("password")) < 20:
                    user = UserTB(param.get("username"), param.get("password"))
                    user = user.selectUser()
                    if user != ():
                        content = json.dumps(formatres(False, {}, "用户名已存在"), ensure_ascii=False)
                    else:
                        try:
                            insetintouser = UserTB(param.get("username"), param.get("password"))
                            insetintouser = insetintouser.insetinto()
                            if insetintouser == ():
                                content = json.dumps(formatres(True, {}, "用户创建成功"), ensure_ascii=False)
                            else:
                                content = json.dumps(formatres(False, {}, "用户创建失败"), ensure_ascii=False)
                        except:
                            content = json.dumps(formatres(False, {}, "用户创建失败,可能用户名过长"), ensure_ascii=False)
                else:
                    content = json.dumps(formatres(False, {}, "用户名密码为空或者过长"), ensure_ascii=False)
            else:
                content = json.dumps(formatres(False, {}, "两次输入密码不一致"), ensure_ascii=False)
        else:
            content = json.dumps(formatres(False, {}, "请检查参数"), ensure_ascii=False)
    else:
        content = json.dumps(formatres(False, {}, "101"))
    resp = Response_headers(content)
    return content