def doHome():
self.http.form['strs'] = "Hello Python!"
return { self.doMD5()['md5']: "world"[::-1] }
def doHello():
self.contentType = "text/html; charset=utf-8"
return '<h1>Hello, Python API !</h1>'
def doTestMap(strs):
strs = self.http.form.get('strs')
if strs is None or strs == "": return { 'err': "请输入参数" }
intro_map = {
"A": "AAA",
"B": "BBB",
"C": "CCC"
}
intro = intro_map.get(strs, "无数据")
return intro
def doAsyncWaitTest():
res = { 'time1': int(time.time()) }
# 轻量级线程(协程)
asyncio.run(self.asyncTest())
res['time2'] = int(time.time()) # 获取秒级时间戳
return res
def doMonitor():
cpu = psutil.cpu_percent()
memory = psutil.virtual_memory().percent
return { 'msg': f'CPU Usage: {cpu}%, Memory Usage: {memory}%' }
def doRandom():
return { 'random': random.randint(1, 100) }
def doMD5(strs):
strs = self.http.form.get('strs')
if strs is None or strs == "": return { 'err': "请输入参数" }
return { 'md5': tools.MD5(strs) }
def doSha256():
return { 'sha256': tools.Sha256("hello world") }
def doBase64Encode(strs):
strs = self.http.form.get('strs')
if strs is None or strs == "":
return { 'err': "请输入参数" }
return { 'base64Encode': tools.base64Encode(strs) }
def doBase64Decode():
return { 'base64Decode': tools.base64Decode("aGVsbG8gd29ybGQ=") }
def doTestSqlite3():
db = tools().openDb('app_data/db.sqlite3')
res = db.query("""WITH
sqlite_master as (
select key+1 userid, value->>'name' name,
datetime(current_timestamp, 'localtime') regtime
from json_each(?)
)
select * from sqlite_master
""", [ json.dumps([
{ "name": "小明" },
{ "name": "小红" }
]) ])
db.closeDb()
return res
def doInitDB():
db = tools().openDb('app_data/db.sqlite3')
# 创建表,无数据返回的查询
upnum = db.none("""
create table if not exists users (
userid integer primary key autoincrement,
name text not null,
regtime datetime not null
)
""")
# 建立索引
# db.none("create unique index users_name on users(name)")
db.none("create index users_name on users(name)")
# 插入数据,有数据返回的查询
added = db.query("""
insert into users (name, regtime)
select value->>'name', datetime('now', 'localtime')
from json_each(?)
returning *
""", [ json.dumps([
{ "name": "小明" },
{ "name": "小红" },
{ "name": "小刚" }
]) ])
# 注意:如果当前页面没有 none 写数据,仅通过 query 执行查询时,需要声明数据库被修改了,以便关闭时自动提交事务,否则,数据库变更不会生效
# db.dbChanged = True
allrows = db.query("select * from users order by userid desc limit 5")
db.closeDb()
return { 'added': added, 'allrows': allrows, 'upnum': upnum }
def doTables():
db = tools().openDb('app_data/db.sqlite3')
res = db.query("select * from sqlite_master")
db.closeDb()
return { 'tables': res }
def doUsers():
db = tools().openDb('app_data/db.sqlite3')
res = db.query("select * from users order by userid desc limit 5")
db.closeDb()
return { 'data': res }
def doUsersIndex():
db = tools().openDb('app_data/db.sqlite3')
res = db.query("""
-- select name from sqlite_master where type='index' and tbl_name='users';
select name, sql from sqlite_master where type='index' and tbl_name='users';
""")
db.closeDb()
return { 'index': res }
def doTestAjax():
# return tools().ajax('http://rimifon.3vzhuji.cn/?r=weibo/topicList', { 'forumid': 3 })
res = tools().ajax('https://ucms.cn/api/Index/Api/distance', { 'id': 1 }) # 返回的 res 是一个包含距离值的数字或可转换为数字的字符串
try:
distance_km = float(res) / 1000 # 获取原始数据(单位:米),米转公里,先转换为浮点数
return { 'msg': f"Pojin 与 Rimifon 的距离:{round(distance_km, 2)} 公里" } # 四舍五入保留2位小数
except (ValueError, TypeError) as e:
return { 'error': f"距离计算错误: {str(e)}", 'raw_response': res }
def doTestAjaxJson():
res = tools().ajax('https://ucms.cn/api/Index/Api/distance2', { 'id': 1 })
try:
data = json.loads(res) # 解析 JSON
distance_km = float(data['distance']) / 1000 # 米转公里
return { 'msg': f"Pojin 与 Rimifon 的距离:{round(distance_km, 2)} 公里" }
except Exception as e:
return { 'error': f"解析失败: {str(e)}", 'response': res }
# except KeyError as e:
# return {'error': f"JSON中缺少必要字段: {str(e)}", 'raw_response': res}
# except json.JSONDecodeError as e:
# return {'error': f"JSON解析错误: {str(e)}", 'raw_response': res}
def doTestPage():
res = { "code": 0, "numbers": [ 1, 2, 3, 4, 5 ]}
db = tools().openDb('app_data/db.sqlite3')
res['data'] = db.query("select * from users order by userid desc limit 5")
db.closeDb()
self.contentType = "text/html; charset=UTF-8"
return tools.render('api/test.html', res) # tools.render 是个静态方法,不需要新建实例,tools() 新建了一个实例
def doTestFile():
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
folder_path = os.path.join(current_dir, "newfolder")
if not os.path.exists(folder_path):
os.makedirs(folder_path, exist_ok=True)
# 创建文件的完整路径
file_path = os.path.join(folder_path, "test.txt")
# 创建并写入文件
with open(file_path, "w", encoding='utf-8') as f:
f.write("hello python!")
# 读取文件内容
with open(file_path, "r", encoding='utf-8') as f:
content = f.read()
return { 'content': content, 'time': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'folder': os.listdir(folder_path) }
# os.chdir(file_path) # 切换到当前工作目录
# os.getcwd() # 获取当前工作目录
# os.rmdir(file_path) # 删除一个空目录
# os.remove("test.txt") # 删除文件
# os.rename("test.txt", "test2.txt") # 改名
# os.chmod("test.txt", 0o755) # 设置文件权限
def doJob():
def jobTask():
print("Job running...")
# 设置带标签的任务
schedule.every(1).minutes.do(jobTask).tag('minutely')
schedule.every().day.at("10:30").do(jobTask).tag('daily')
# 运行一段时间后取消任务
start_time = time.time()
while True:
schedule.run_pending()
time.sleep(1)
# 运行5分钟后取消所有任务
if time.time() - start_time > 300: # 300秒 = 5分钟
print("取消所有定时任务")
schedule.clear()
break
# 或者取消特定类型的任务
# if time.time() - start_time > 60: # 60秒后取消每分钟任务
# print("取消每分钟任务")
# schedule.clear('minutely')
def doExec1():
ns = {}
exec("""
def bar(y):
return y+10
""", ns)
return { 'bar': ns['bar'](5) }
def doExec2():
# exec("def foo(x):\n\treturn x*2", globals())
exec("""
def foo(x):
return x*2
""", globals())
return { 'foo': foo(4) }