Skip to main content

常用函数封装

Project description

IUpdatable

Common function package 封装常用函数

详细文档:https://www.cnblogs.com/IUpdatable/articles/12500039.html

Installation

首次安装:

pip install iupdatable

更新安装:

pip install --upgrade iupdatable

更新日志:

v 0.3.1

  • SelfCleaningBackgroundService中的后台任务函数改成异步函数;

v 0.3.0

  • 仿 C# 中的 File 类,完善了 File 类
  • 仿 C# 中的 Directory 类,完善了 Directory 类
  • Timer 中新增一些与 datetime 格式化相关的函数
  • 新增 SelfCleaningBackgroundService 类,用于实现自清理的后台服务

File 类新增函数演示

import asyncio
import os
from iupdatable import File
import datetime

# 测试文件路径
test_file = "test_file.txt"
test_file_copy = "test_file_copy.txt"
test_file_move = "test_file_move.txt"
test_file_replace = "test_file_replace.txt"
test_file_backup = "test_file_backup.txt"


async def run_tests():
    print("开始测试File类的方法")

    # 测试create方法
    print("\n测试 create 方法")
    File.create(test_file)
    print(f"文件创建成功: {File.exists(test_file)}")

    # 测试write_all_text方法
    print("\n测试 write_all_text 方法")
    File.write_all_text(test_file, "Hello, World!")
    print(f"文件内容: {File.read_all_text(test_file)}")

    # 测试append_all_text方法
    print("\n测试 append_all_text 方法")
    File.append_all_text(test_file, "\nAppended text")
    print(f"文件内容: {File.read_all_text(test_file)}")

    # 测试write_all_lines方法
    print("\n测试 write_all_lines 方法")
    File.write_all_lines(test_file, ["Line 1", "Line 2", "Line 3"])
    print(f"文件内容: {File.read_all_lines(test_file)}")

    # 测试append_all_lines方法
    print("\n测试 append_all_lines 方法")
    File.append_all_lines(test_file, ["Line 4", "Line 5"])
    print(f"文件内容: {File.read_all_lines(test_file)}")

    # 测试copy方法
    print("\n测试 copy 方法")
    File.copy(test_file, test_file_copy)
    print(f"复制成功: {File.exists(test_file_copy)}")

    # 测试move方法
    print("\n测试 move 方法")
    File.move(test_file_copy, test_file_move)
    print(f"移动成功: {File.exists(test_file_move) and not File.exists(test_file_copy)}")

    # 测试replace方法
    print("\n测试 replace 方法")
    File.write_all_text(test_file_replace, "Original content")
    File.replace(test_file, test_file_replace, test_file_backup)
    print(f"替换成功: {File.read_all_text(test_file_replace) == File.read_all_text(test_file)}")
    print(f"备份成功: {File.exists(test_file_backup)}")

    # 测试get_creation_time方法
    print("\n测试 get_creation_time 方法")
    creation_time = File.get_creation_time(test_file)
    print(f"创建时间: {creation_time}")

    # 测试get_last_access_time方法
    print("\n测试 get_last_access_time 方法")
    last_access_time = File.get_last_access_time(test_file)
    print(f"最后访问时间: {last_access_time}")

    # 测试get_last_write_time方法
    print("\n测试 get_last_write_time 方法")
    last_write_time = File.get_last_write_time(test_file)
    print(f"最后修改时间: {last_write_time}")

    # 测试set_creation_time方法
    print("\n测试 set_creation_time 方法")
    new_creation_time = datetime.datetime.now() - datetime.timedelta(days=1)
    File.set_creation_time(test_file, new_creation_time)
    print(f"新创建时间: {File.get_creation_time(test_file)}")

    # 测试set_last_access_time方法
    print("\n测试 set_last_access_time 方法")
    new_last_access_time = datetime.datetime.now() - datetime.timedelta(hours=2)
    File.set_last_access_time(test_file, new_last_access_time)
    print(f"新最后访问时间: {File.get_last_access_time(test_file)}")

    # 测试set_last_write_time方法
    print("\n测试 set_last_write_time 方法")
    new_last_write_time = datetime.datetime.now() - datetime.timedelta(minutes=30)
    File.set_last_write_time(test_file, new_last_write_time)
    print(f"新最后修改时间: {File.get_last_write_time(test_file)}")

    # 测试read_all_bytes方法
    print("\n测试 read_all_bytes 方法")
    bytes_content = File.read_all_bytes(test_file)
    print(f"文件字节内容: {bytes_content[:20]}...")

    # 测试write_all_bytes方法
    print("\n测试 write_all_bytes 方法")
    new_bytes = b"New binary content"
    File.write_all_bytes(test_file, new_bytes)
    print(f"新文件字节内容: {File.read_all_bytes(test_file)}")

    # 测试异步方法
    print("\n测试异步方法")
    await File.write_all_text_async(test_file, "Async content")
    async_content = await File.read_all_text_async(test_file)
    print(f"异步读写结果: {async_content}")

    # 测试delete方法
    print("\n测试 delete 方法")
    for file in [test_file, test_file_move, test_file_replace, test_file_backup]:
        if File.exists(file):
            success = File.delete(file)
            print(f"删除 {file}: {'成功' if success else '失败'}")

    print("\n测试完成")

# 运行测试
asyncio.run(run_tests())

Directory 新增函数调用演示

import datetime
import os
from iupdatable import Directory

# 设置测试目录
test_dir = os.path.join(os.getcwd(), "test_directory")
sub_dir = os.path.join(test_dir, "sub_directory")
test_file = os.path.join(test_dir, "test_file.txt")

print("1. 测试 create_directory")
result = Directory.create_directory(test_dir)
print(f"创建目录结果: {result}")

print("\n2. 测试 exists")
print(f"目录是否存在: {Directory.exists(test_dir)}")

print("\n3. 测试 get_current_directory")
print(f"当前目录: {Directory.get_current_directory()}")

print("\n4. 测试 get_creation_time")
print(f"目录创建时间: {Directory.get_creation_time(test_dir)}")

print("\n5. 测试 create_directory (子目录)")
sub_dir_result = Directory.create_directory(sub_dir)
print(f"创建子目录结果: {sub_dir_result}")

print("\n6. 测试 get_directories")
print(f"子目录列表: {Directory.get_directories(test_dir)}")

print("\n7. 测试 get_directory_root")
print(f"目录根: {Directory.get_directory_root(test_dir)}")

print("\n8. 测试 get_files")
# 创建一个测试文件
with open(test_file, 'w') as f:
    f.write("This is a test file.")
print(f"目录中的文件: {Directory.get_files(test_dir)}")

print("\n9. 测试 get_file_system_entries")
print(f"目录中的所有项: {Directory.get_file_system_entries(test_dir)}")

print("\n10. 测试 get_last_access_time")
print(f"最后访问时间: {Directory.get_last_access_time(test_dir)}")

print("\n11. 测试 get_last_write_time")
print(f"最后修改时间: {Directory.get_last_write_time(test_dir)}")

print("\n12. 测试 get_logical_drives")
print(f"逻辑驱动器: {Directory.get_logical_drives()}")

print("\n13. 测试 get_parent")
print(f"父目录: {Directory.get_parent(test_dir)}")

print("\n14. 测试 move")
move_dir = os.path.join(os.getcwd(), "moved_directory")
Directory.move(test_dir, move_dir)
print(f"移动后目录是否存在: {Directory.exists(move_dir)}")

print("\n15. 测试 set_creation_time")
new_time = datetime.datetime.now() - datetime.timedelta(days=1)
Directory.set_creation_time(move_dir, new_time)
print(f"设置后的创建时间: {Directory.get_creation_time(move_dir)}")

print("\n16. 测试 set_last_access_time")
new_access_time = datetime.datetime.now() - datetime.timedelta(hours=2)
Directory.set_last_access_time(move_dir, new_access_time)
print(f"设置后的最后访问时间: {Directory.get_last_access_time(move_dir)}")

print("\n17. 测试 set_last_write_time")
new_write_time = datetime.datetime.now() - datetime.timedelta(minutes=30)
Directory.set_last_write_time(move_dir, new_write_time)
print(f"设置后的最后修改时间: {Directory.get_last_write_time(move_dir)}")

print("\n18. 测试 delete")
Directory.delete(move_dir, recursive=True)
print(f"删除后目录是否存在: {Directory.exists(move_dir)}")

print("\n19. 测试 get_absolute_directory")

print(f"当前路径的绝对路径: {Directory.get_absolute_directory("./")}")

SelfCleaningBackgroundService类调用演示

import asyncio
from iupdatable import Timer
from iupdatable.services.SelfCleaningBackgroundService import SelfCleaningBackgroundService


class Demo(SelfCleaningBackgroundService):

    def __init__(self, name, is_debug=False, auto_start=True):
        super().__init__(is_debug=is_debug, auto_start=auto_start)
        self._name = name
        self._counter = 0

    async def _background_task(self):
        while True:
            self._counter += 1
            print(f"{Timer.get_now_time_str()}: {self._name} counter: {self._counter}")
            await asyncio.sleep(1)


async def main():
    # 创建一个自动启动的 Demo 实例
    demo1 = Demo(name="demo1", is_debug=True)
    print(f"{Timer.get_now_time_str()}: Demo1 created (auto-start)")
    await asyncio.sleep(3)

    # 停止 demo1
    print(f"{Timer.get_now_time_str()}: Stopping demo1")
    demo1.stop()
    await asyncio.sleep(3)

    # 重新启动 demo1
    print(f"{Timer.get_now_time_str()}: Restarting demo1")
    demo1.start()
    await asyncio.sleep(2)

    # 创建一个不自动启动的 Demo 实例
    demo2 = Demo(name="demo2", is_debug=True, auto_start=False)
    print(f"{Timer.get_now_time_str()}: Demo2 created (not auto-start)")
    await asyncio.sleep(2)

    # 手动启动 demo2
    print(f"{Timer.get_now_time_str()}: Starting demo2")
    demo2.start()
    await asyncio.sleep(3)

    print(f"{Timer.get_now_time_str()}: Main function ending")

if __name__ == "__main__":
    asyncio.run(main())

v 0.2.4

  • 增加 WeiXinCrawler 类, 用于抓取微信公众号历史消息。

v 0.2.2

  • 增加 Timer 类

Timer 主要函数:

  • 获取 Unix 时间戳(精确到秒):timestamp、unix、unix10
  • 获取 Unix 时间戳(精确到毫秒):timestamp13、unix13
  • 随机等待若干秒:sleep_range、sleep_range_async
  • Unix 时间戳转换成 datetime:unix_to_datetime、timestamp_to_datetime
  • Unix 时间戳转换成 datetime 字符串:unix_to_datetime_str、timestamp_to_datetime_str

v 0.2.0

修复多进程下写入日志报错问题

  • 多进程下操作日志会报错:[WinError 32] 另一个程序正在使用此文件,进程无法访问。 这里通过添加 concurrent_log_handler 模块 替换掉系统内置的 TimedRotatingFileHandler 解决。

v 0.1.9

  • 添加 Status 类

文件 - File

  • read: 读取文件
  • write: 写入文件
  • append:追加写入文件
  • append_new_line:新建一行,然后追加写入文件
  • read_lines: 按行一次性读取文件
  • write_lines:按行一次性写入文件
  • write_csv:写入CSV文件
  • read_csv:读取CSV文件
  • exist_within_extensions: 检查一个文件是否存在(在指定的几种格式中)
  • get_file_path_within_extensions: 获取一个文件的路径(在指定的几种格式中)

简单实例:

from iupdatable.system.io.File import File


sample_text = 'this is sample text.'
sample_texts = ['123', 'abc', 'ABC']
append_text = 'this is append text.'

# 写入
File.write('1.txt', sample_text)
File.write_lines('2.txt', sample_texts)

File.append('1.txt', append_text)
File.append_new_line('2.txt', append_text)

# 读取
read_text1 = File.read('1.txt')
read_text2 = File.read_lines('2.txt')

# 打印输出
print(read_text1)
print(read_text2)

日志 - logging

简单实例:

from iupdatable.logging.Logger import Logger
from iupdatable.logging.LogLevel import LogLevel


def test_logging():
    # 日志等级:
    # CRITICAL  同:FATEL,下同
    # ERROR
    # WARNING
    # INFO
    # DEBUG
    # NOTSET    按照 WARNING 级别输出

    # 设置为 DEBUG,输出所有信息
    # 设置为 WARNING, INFO、DEBUG 级别的日志就不会输出
    Logger.get_instance().config(log_level=LogLevel.DEBUG)

    Logger.get_instance().debug('debug message1')
    Logger.get_instance().info('info message1')
    Logger.get_instance().warning('warning message1')
    Logger.get_instance().error('error message1')
    Logger.get_instance().debug('debug message1', is_with_debug_info=True)  # 要想输出具体的调试信息
    Logger.get_instance().fatal('fatal message1')
    Logger.get_instance().critical('critical message1')  # fatal = critical

    # 也可以输出变量
    abc = [1, 2, 4]
    Logger.get_instance().info(abc)


test_logging()

Base64

  • encode:base64编码
  • decode:base64解码
  • encode_multilines:base64多行解码
  • decode_multilines:base64多行解码

CSProduct

from iupdatable.system.hardware import CSProduct

# 一次性获取所有的CSProduct信息
cs_product = CSProduct.get()
print("CSProduct: " + str(cs_product))
print(cs_product["Caption"])

# 或者
# 使用各项函数单独获取
print("Caption: " + CSProduct.get_caption())
print("Description: " + CSProduct.get_description())
print("IdentifyingNumber: " + CSProduct.get_identifying_number())
print("Name: " + CSProduct.get_name())
print("SKUNumber: " + CSProduct.get_sku_number())
print("UUID: " + CSProduct.get_uuid())
print("Vendor: " + CSProduct.get_vendor())
print("Version: " + CSProduct.get_version())

UMeng

友盟统计

这里使用了自定义事件统计的功能

创建网站类型的统计,第一个参数是统计代码中的id

UMeng.log_stat(1211111111, '来源页面', '目录', '行为', '标签')

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

iupdatable-0.3.1.tar.gz (27.7 kB view details)

Uploaded Source

File details

Details for the file iupdatable-0.3.1.tar.gz.

File metadata

  • Download URL: iupdatable-0.3.1.tar.gz
  • Upload date:
  • Size: 27.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.0

File hashes

Hashes for iupdatable-0.3.1.tar.gz
Algorithm Hash digest
SHA256 f7b32f84a5b1f3dd6c00d7df39463072047aebd67e1ee826bb09f9b4f4240c34
MD5 af32ee519470be38ab84b2e32431f96f
BLAKE2b-256 b1f1a9279883c320a5d7c0972d34911db5a8debf85dcf99897a92c5e81d5054d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page