198 lines
6.0 KiB
Python
198 lines
6.0 KiB
Python
"""
|
|
@time:2024-07018
|
|
@author:zhang
|
|
@param:pdf_file
|
|
PDF文件打印
|
|
|
|
"""
|
|
import win32print
|
|
import win32api
|
|
import os
|
|
import threading
|
|
from flask import Flask, request, jsonify
|
|
from PyPDF2 import PdfWriter, PdfReader
|
|
from flask_cors import CORS
|
|
import ctypes
|
|
import pystray
|
|
from PIL import Image
|
|
from pystray import MenuItem
|
|
# import requests
|
|
import socket
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
import time
|
|
import threading
|
|
|
|
|
|
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
log_path = os.path.join(CUR_DIR, 'printer.log')
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
formatter = logging.Formatter('[%(asctime)s] [%(filename)s:%(lineno)d] [%(levelname)s]- %(message)s')
|
|
|
|
file_hander = RotatingFileHandler(log_path, maxBytes=1024*1024*10, backupCount=1)
|
|
file_hander.setFormatter(formatter)
|
|
file_hander.setLevel(logging.INFO)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(formatter)
|
|
console_handler.setLevel(logging.DEBUG)
|
|
|
|
logger.addHandler(file_hander)
|
|
logger.addHandler(console_handler)
|
|
|
|
log_command = f'start "" powershell -Command "Get-Content -Path \'{log_path}\' -Tail 10 -Wait"'
|
|
kill_command = 'taskkill /f /im GxPrint.exe'
|
|
|
|
|
|
|
|
# 创建一个Flask应用
|
|
app = Flask(__name__)
|
|
CORS(app, supports_credentials=False, origins="*")
|
|
app.json.ensure_ascii = False
|
|
icon: pystray.Icon = None
|
|
|
|
|
|
class MyPrint:
|
|
|
|
def __init__(self) -> None:
|
|
self.is_one_instance = False
|
|
self.is_working = False
|
|
self.icon = None # 下标
|
|
self.check_one_instance()
|
|
|
|
|
|
def check_one_instance(self):
|
|
try:
|
|
global s
|
|
s = socket.socket()
|
|
host = socket.gethostname()
|
|
s.bind((host, 8080))
|
|
self.is_one_instance = True
|
|
except Exception:
|
|
self.is_one_instance = False
|
|
|
|
def close(self):
|
|
self.is_working = False
|
|
self.icon.stop()
|
|
logger.info("程序退出")
|
|
os.system(kill_command)
|
|
|
|
def log(self):
|
|
os.system(log_command)
|
|
|
|
def run(self):
|
|
if not self.is_one_instance:
|
|
return
|
|
menu = (MenuItem('实时日志', self.log), MenuItem('退出', self.close))
|
|
image = Image.open("001.ico")
|
|
title = "打印机"
|
|
self.icon = pystray.Icon(title, image, "标签打印", menu)
|
|
threading.Thread(target=self.icon.run, daemon=True).start()
|
|
|
|
app.run(port=8080, debug=False, host='0.0.0.0')
|
|
|
|
|
|
@app.route('/print/', methods=['POST', 'OPTIONS'])
|
|
def pdf_printer():
|
|
if request.method == 'OPTIONS': # 跨域请求预检
|
|
return jsonify({"message": "OK"}), 200
|
|
file = request.files['file']
|
|
printer_name = request.form.get('printer_name', None)
|
|
# Save the file temporarily
|
|
temp_pdf_path = os.path.join(os.path.dirname(__file__), 'temp_pdf.pdf')
|
|
logger.info(f"Saving file to {temp_pdf_path}")
|
|
|
|
try:
|
|
# Read the PDF binary
|
|
file.save(temp_pdf_path)
|
|
if printer_name:
|
|
win32print.OpenPrinter(printer_name)
|
|
win32print.SetDefaultPrinter(printer_name)
|
|
else:
|
|
return jsonify({"err_msg": "打印机名称不正确","err_code": "printer_name_error"}), 400
|
|
|
|
win32api.ShellExecute(
|
|
0,
|
|
"print",
|
|
temp_pdf_path,
|
|
None,
|
|
".",
|
|
0
|
|
)
|
|
logger.info("打印成功")
|
|
except Exception as e:
|
|
logger.error(f"打印失败: {e}")
|
|
icon.notify(message=f"打印失败: {e}", title="标签打印")
|
|
return jsonify({"err_msg": "打印失败", "err_code": "print_failed"}), 500
|
|
finally:
|
|
t = threading.Thread(target=del_file, args=(temp_pdf_path,))
|
|
t.start()
|
|
# # 清除打印文件
|
|
return jsonify({}), 200
|
|
|
|
def del_file(path):
|
|
import time
|
|
time.sleep(5)
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
|
|
def save_pdf_from_binary(pdf_binary, file_path):
|
|
"""将二进制数据写入 PDF 文件"""
|
|
with open(file_path, 'wb') as file:
|
|
file.write(pdf_binary)
|
|
|
|
request_lock = threading.Lock()
|
|
|
|
@app.route('/prints/', methods=['POST', 'OPTIONS'])
|
|
def str_printer():
|
|
if request.method == 'OPTIONS': # 跨域请求预检
|
|
return jsonify({"message": "OK"}), 200
|
|
if not request_lock.acquire(timeout=30):
|
|
logger.warning("打印请求超时,系统繁忙")
|
|
return jsonify({
|
|
"err_msg": "系统繁忙,请稍后再试",
|
|
"err_code": "system_busy"
|
|
}), 503
|
|
try:
|
|
print_commands = request.json.get('printer_commands', None)
|
|
print_name = request.json.get('printer_name', None)
|
|
tsclibrary = ctypes.WinDLL(".//TSCLIB.dll")
|
|
try:
|
|
tsclibrary.openportW(print_name)
|
|
except Exception as e:
|
|
logger.error(f"打印机名称无效: {e}")
|
|
return jsonify({"err_msg": "打印机名称无效","err_code": "printer_name_error"}), 400
|
|
tsclibrary.clearbuffer()
|
|
try:
|
|
for item in print_commands:
|
|
if 'WINTEXT' in item:
|
|
item_list = item.replace('WINTEXT ', '').split(',')
|
|
tsclibrary.windowsfontW(
|
|
item_list[0],
|
|
item_list[1],
|
|
item_list[2],
|
|
item_list[3],
|
|
item_list[4],
|
|
item_list[5],
|
|
item_list[6],
|
|
item_list[7])
|
|
else:
|
|
tsclibrary.sendcommandW(item)
|
|
tsclibrary.closeport()
|
|
logger.info("打印成功")
|
|
except Exception as e:
|
|
logger.error(f"打印失败: {e}")
|
|
icon.notify(message=f"打印失败: {e}", title="标签打印")
|
|
return jsonify({"err_msg": "打印失败", "err_code": "print_failed"}), 500
|
|
|
|
return jsonify({}), 200
|
|
finally:
|
|
request_lock.release()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
MyPrint().run()
|
|
|
|
# pyinstaller --onefile --windowed --icon=001.ico GxPrint.py |