Python API 文档

这是自动生成的 API 文档
使用 Jinja2 模板引擎渲染
TestPage
1 Home:
@form { }
* 默认 API
[检视源码]
	def doHome():
		self.http.form['strs'] = "Hello Python!"
		return { self.doMD5()['md5']: "world"[::-1] }
2 Hello: 返回HTML格式的问候语
@form { }
* Hello 测试
[检视源码]
	def doHello():
		self.contentType = "text/html; charset=utf-8"
		return '<h1>Hello, Python API !</h1>'
2 TestMap:
@form { strs }
* TestMap
[检视源码]
	def doTestMap(strs):
		strs = self.http.form.get('strs')
		if strs is None or strs == "": return { 'err': "请输入参数" }
		intro_map = {
			"A": "AAA",
			"B": "BBB",
			"C": "CCC"
		}
		intro = intro_map.get(strs, "无数据")
		return intro
3 AsyncWaitTest:
@form { }
* 测试异步等待
[检视源码]
	def doAsyncWaitTest():
		res = { 'time1': int(time.time()) }
		# 轻量级线程(协程)
		asyncio.run(self.asyncTest())
		res['time2'] = int(time.time())	# 获取秒级时间戳
		return res
4 Monitor: 监控CPU和内存使用情况
@form { }
* 系统资源监控
[检视源码]
	def doMonitor():
		cpu = psutil.cpu_percent()
		memory = psutil.virtual_memory().percent
		return { 'msg': f'CPU Usage: {cpu}%, Memory Usage: {memory}%' }
5 Random: 生成1到100之间的随机数
@form { }
* 生成随机数
[检视源码]
	def doRandom():
		return { 'random': random.randint(1, 100) }
6 MD5:
@form { strs }
* MD5加密算法
* 对输入字符串进行MD5哈希计算
[检视源码]
	def doMD5(strs):
		strs = self.http.form.get('strs')
		if strs is None or strs == "": return { 'err': "请输入参数" }
		return { 'md5': tools.MD5(strs) }
7 Sha256: 对字符串进行 SHA-256 哈希计算
@form { }
* SHA-256 加密
[检视源码]
	def doSha256():
		return { 'sha256': tools.Sha256("hello world") }
8 Base64Encode: 对字符串进行 Base64 编码
@form { strs }
* Base64 编码
[检视源码]
	def doBase64Encode(strs):
		strs = self.http.form.get('strs')
		if strs is None or strs == "":
			return { 'err': "请输入参数" }
		return { 'base64Encode': tools.base64Encode(strs) }
9 Base64Decode: 对Base64字符串进行解码
@form { }
* Base64 解码
[检视源码]
	def doBase64Decode():
		return { 'base64Decode': tools.base64Decode("aGVsbG8gd29ybGQ=") }
10 TestSqlite3:
@form { }
* 测试数据库操作
[检视源码]
	def doTestSqlite3():
		db = tools().openDb('app_data/db.sqlite3')
		res = db.query("""WITH
			sqlite_master as (
				select key+1 userid, value->>'name' name,
				datetime(current_timestamp, 'localtime') regtime
				from json_each(?)
			)
			select * from sqlite_master
		""",  [ json.dumps([
			{ "name": "小明" },
			{ "name": "小红" }
		]) ])
		db.closeDb()
		return res
11 InitDB:
@form { }
* 创建表、插入数据
[检视源码]
	def doInitDB():
		db = tools().openDb('app_data/db.sqlite3')
		# 创建表,无数据返回的查询
		upnum = db.none("""
			create table if not exists users (
				userid integer primary key autoincrement,
				name text not null,
				regtime datetime not null
			)
		""")
		# 建立索引
		# db.none("create unique index users_name on users(name)")
		db.none("create index users_name on users(name)")
		# 插入数据,有数据返回的查询
		added = db.query("""
			insert into users (name, regtime)
			select value->>'name', datetime('now', 'localtime')
			from json_each(?)
			returning *
		""", [ json.dumps([
			{ "name": "小明" },
			{ "name": "小红" },
			{ "name": "小刚" }
		]) ])
		# 注意:如果当前页面没有 none 写数据,仅通过 query 执行查询时,需要声明数据库被修改了,以便关闭时自动提交事务,否则,数据库变更不会生效
		# db.dbChanged = True
		allrows = db.query("select * from users order by userid desc limit 5")
		db.closeDb()
		return { 'added': added, 'allrows': allrows, 'upnum': upnum }
12 Tables:
@form { }
* 查询数据表结构
[检视源码]
	def doTables():
		db = tools().openDb('app_data/db.sqlite3')
		res = db.query("select * from sqlite_master")
		db.closeDb()
		return { 'tables': res }
13 Users:
@form { }
* 查询 users 表
[检视源码]
	def doUsers():
		db = tools().openDb('app_data/db.sqlite3')
		res = db.query("select * from users order by userid desc limit 5")
		db.closeDb()
		return { 'data': res }
14 UsersIndex:
@form { }
* 查看 User 表索引
[检视源码]
	def doUsersIndex():
		db = tools().openDb('app_data/db.sqlite3')
		res = db.query("""
			-- select name from sqlite_master where type='index' and tbl_name='users';
			select name, sql from sqlite_master where type='index' and tbl_name='users';
		""")
		db.closeDb()
		return { 'index': res }
15 TestAjax:
@form { }
* 测试 Ajax
[检视源码]
	def doTestAjax():
		# return tools().ajax('http://rimifon.3vzhuji.cn/?r=weibo/topicList', { 'forumid': 3 })
		res = tools().ajax('https://ucms.cn/api/Index/Api/distance', { 'id': 1 })	# 返回的 res 是一个包含距离值的数字或可转换为数字的字符串
		try:
			distance_km = float(res) / 1000  # 获取原始数据(单位:米),米转公里,先转换为浮点数
			return { 'msg': f"Pojin 与 Rimifon 的距离:{round(distance_km, 2)} 公里" }	# 四舍五入保留2位小数
		except (ValueError, TypeError) as e:
			return { 'error': f"距离计算错误: {str(e)}", 'raw_response': res }
16 TestAjaxJson:
@form { }
* 测试 Ajax_Json
[检视源码]
	def doTestAjaxJson():
		res = tools().ajax('https://ucms.cn/api/Index/Api/distance2', { 'id': 1 })
		try:
			data = json.loads(res)  # 解析 JSON
			distance_km = float(data['distance']) / 1000  # 米转公里
			return { 'msg': f"Pojin 与 Rimifon 的距离:{round(distance_km, 2)} 公里" }
		except Exception as e:
			return { 'error': f"解析失败: {str(e)}", 'response': res }
		# except KeyError as e:
		# 	return {'error': f"JSON中缺少必要字段: {str(e)}", 'raw_response': res}
		# except json.JSONDecodeError as e:
		# 	return {'error': f"JSON解析错误: {str(e)}", 'raw_response': res}
17 TestPage:
@form { }
* 测试模板
[检视源码]
	def doTestPage():
		res = { "code": 0, "numbers": [ 1, 2, 3, 4, 5 ]}
		db = tools().openDb('app_data/db.sqlite3')
		res['data'] = db.query("select * from users order by userid desc limit 5")
		db.closeDb()
		self.contentType = "text/html; charset=UTF-8"
		return tools.render('api/test.html', res)	# tools.render 是个静态方法,不需要新建实例,tools() 新建了一个实例
18 TestFile:
@form { }
* 测试文件操作
[检视源码]
	def doTestFile():
		# 获取当前脚本的绝对路径
		current_dir = os.path.dirname(os.path.abspath(__file__))
		folder_path = os.path.join(current_dir, "newfolder")
		if not os.path.exists(folder_path):
			os.makedirs(folder_path, exist_ok=True)
		# 创建文件的完整路径
		file_path = os.path.join(folder_path, "test.txt")
		# 创建并写入文件
		with open(file_path, "w", encoding='utf-8') as f:
			f.write("hello python!")
		# 读取文件内容
		with open(file_path, "r", encoding='utf-8') as f:
			content = f.read()
		return { 'content': content, 'time': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'folder': os.listdir(folder_path) }
		# os.chdir(file_path)	# 切换到当前工作目录
		# os.getcwd()	# 获取当前工作目录
		# os.rmdir(file_path)	# 删除一个空目录
		# os.remove("test.txt")	# 删除文件
		# os.rename("test.txt", "test2.txt")	# 改名
		# os.chmod("test.txt", 0o755)  # 设置文件权限
19 Job:
@form { }
* 定时任务
[检视源码]
	def doJob():
		def jobTask():
			print("Job running...")
		# 设置带标签的任务
		schedule.every(1).minutes.do(jobTask).tag('minutely')
		schedule.every().day.at("10:30").do(jobTask).tag('daily')
		# 运行一段时间后取消任务
		start_time = time.time()
		while True:
			schedule.run_pending()
			time.sleep(1)
			# 运行5分钟后取消所有任务
			if time.time() - start_time > 300:  # 300秒 = 5分钟
				print("取消所有定时任务")
				schedule.clear()
				break
			# 或者取消特定类型的任务
			# if time.time() - start_time > 60:  # 60秒后取消每分钟任务
			#     print("取消每分钟任务")
			#     schedule.clear('minutely')
20 Exec1:
@form { }
* Exec1
[检视源码]
	def doExec1():
		ns = {}
		exec("""
def bar(y):
	return y+10
""", ns)
		return { 'bar': ns['bar'](5) }
21 Exec2:
@form { }
* Exec2
[检视源码]
	def doExec2():
		# exec("def foo(x):\n\treturn x*2", globals())
		exec("""
def foo(x):
	return x*2
""", globals())
		return { 'foo': foo(4) }
参数录入
执行
取消