
高速下载器
大约 5 分钟
应用程序已打包至蓝奏云
link,密码 114514
以下是源码。
import os
import requests
import threading
import time
from tkinter import *
from tkinter import filedialog, scrolledtext, messagebox, simpledialog
from ttkbootstrap.scrolled import ScrolledText
from queue import Queue
from pathlib import Path
import ttkbootstrap as ttk
# https://dldir1v6.qq.com/weixin/Windows/WeChatSetup.exe
class EnhancedDownloader:
def __init__(self):
self.root = ttk.Window(themename="flatly")
self.root.title("Download_faster(V3.1)")
self.root.resizable(False,False)
#self.root.geometry("680x450")
# 初始化配置
self.chunk_size = 1024*1024 # 1KB
self.temp_dir = Path("temp_downloads")
self.temp_dir.mkdir(exist_ok=True) # 确保目录存在
# 状态变量
self.download_queue = Queue()
self.progress = 0
self.file_size = 0
self.speed = 0
self.last_update = time.time()
self.downloaded_bytes = 0
self.is_downloading = False # 下载状态锁
self.is_logging = False
# 创建界面
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
self.create_widgets()
self.check_url_to_filename()
self.root.mainloop()
def check_url_to_filename(self) :
urlstr=self.url_entry.get().strip()
filestr=self.path_var.get().strip()
if (urlstr!="" and filestr=="") :
#print(urlstr,filestr,1)
self.path_var.insert(0,urlstr.split("/")[-1])
self.root.after(100,self.check_url_to_filename)
def on_close(self) :
if (self.is_downloading) :
messagebox.showinfo("Warning","下载任务尚未结束,无法退出")
else :
self.root.destroy()
def on_close_nwin(self) :
if (self.is_downloading) :
messagebox.showinfo("Warning","下载任务尚未结束,无法退出")
else :
self.nwin.destroy()
def change_text_del(self,box,message) :
box.delete(0,END)
self.change_text(box,message)
def change_text(self,box,message) :
box.insert(END, message)
try :
box.see(END)
except:
pass
box.update()
def create_widgets(self):
# 布局配置
self.root.columnconfigure(1, weight=1)
self.root.rowconfigure(4, weight=1)
#sv_ttk.set_theme("light")
#style = ttk.Style()
#style.configure(".", font=("宋体",10))
# URL输入
ttk.Label(self.root, text="下载URL:").grid(row=0, column=0, sticky=W, padx=5, pady=5)
self.url_entry = ttk.Entry(self.root, width=50)
self.url_entry.grid(row=0, column=1, columnspan=3, sticky=EW, padx=5, pady=5)
# 保存路径
ttk.Label(self.root, text="保存路径:").grid(row=1, column=0, sticky=W, padx=5, pady=5)
self.path_var = ttk.Entry(self.root, width=40)
self.path_var.grid(row=1, column=1, sticky=EW, padx=5, pady=5)
ttk.Button(self.root, text="浏览", command=self.select_path).grid(row=1, column=2, sticky=W, padx=5, pady=5)
# 线程控制
ttk.Label(self.root, text="线程数量:").grid(row=2, column=0, sticky=W, padx=5, pady=5)
self.thread_spin = ttk.Spinbox(self.root, from_=1, to=16, width=5)
self.thread_spin.grid(row=2, column=1, sticky=W, padx=5, pady=5)
self.thread_spin.delete(0, "end")
self.thread_spin.insert(0, "16")
# 速度显示
self.speed_label = ttk.Label(self.root, text="<Waiting>")
self.speed_label.grid(row=2, column=2, sticky=E, padx=10, pady=5)
# 进度条
self.progress_bar = ttk.Progressbar(self.root, orient=HORIZONTAL, mode='determinate',bootstyle="dark-striped")
self.progress_bar.grid(row=3, column=0, columnspan=4, sticky=EW, padx=5, pady=5)
# 控制按钮
ttk.Button(self.root, text="开始下载", command=self.start_download).grid(row=4, column=3, sticky=E, padx=5, pady=5)
# 创建菜单栏
self.menubar=Menu(self.root)
self.filemenubar=Menu(self.menubar,tearoff=0)
self.presets=Menu(self.filemenubar,tearoff=0)
self.presets.add_command(label="2GFile(Party) ",command=lambda: self.change_text_del(self.url_entry,"https://u5.gdl.netease.com/party_netease_103_1.0.194_bf7604.apk?key1=0af02968660ca904c4002a46d43b235c&key2=681182d0"))
self.presets.add_command(label="300MFile(WeChat)",command=lambda: self.change_text_del(self.url_entry,"https://dldir1v6.qq.com/weixin/Windows/WeChatSetup.exe"))
self.presets.add_command(label="Poor internet connection(MCmods)",command=lambda: self.change_text_del(self.url_entry,"https://filesshare.tzzl.site/api/raw/QST%E6%A8%A1%E7%BB%84%E5%B0%8F%E6%89%93%E5%8C%852modpack.zip?auth=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyIjp7ImlkIjo2LCJsb2NhbGUiOiJ6aC1jbiIsInZpZXdNb2RlIjoibW9zYWljIiwic2luZ2xlQ2xpY2siOmZhbHNlLCJwZXJtIjp7ImFkbWluIjpmYWxzZSwiZXhlY3V0ZSI6dHJ1ZSwiY3JlYXRlIjp0cnVlLCJyZW5hbWUiOnRydWUsIm1vZGlmeSI6dHJ1ZSwiZGVsZXRlIjp0cnVlLCJzaGFyZSI6dHJ1ZSwiZG93bmxvYWQiOnRydWV9LCJjb21tYW5kcyI6W10sImxvY2tQYXNzd29yZCI6ZmFsc2UsImhpZGVEb3RmaWxlcyI6ZmFsc2UsImRhdGVGb3JtYXQiOmZhbHNlfSwiaXNzIjoiRmlsZSBCcm93c2VyIiwiZXhwIjoxNzQ0MzA2NjI2LCJpYXQiOjE3NDQyOTk0MjZ9.AvKY6PxGX07JsbAU_NvZ-hLAhMOVf-mu3cGF64FdI0U&"))
self.presets.add_command(label="nwJS(Party)",command=lambda: self.change_text_del(self.url_entry,"https://dl.nwjs.io/v0.92.0/nwjs-v0.92.0-win-x64.zip"))
self.filemenubar.add_cascade(label="预设文件下载链接 ", menu=self.presets)
self.editmenubar=Menu(self.menubar,tearoff=0)
self.editmenubar.add_command(label="清除下载链接 ",command=lambda: self.change_text_del(self.url_entry,""))
self.editmenubar.add_command(label="清除URL",command=lambda: self.change_text_del(self.url_entry,""))
self.editmenubar.add_command(label="清除输入框",command=lambda: self.change_text_del(self.url_entry,""))
self.filemenubar.add_cascade(label="编辑", menu=self.editmenubar)
self.filemenubar.add_command(label="关于",command=lambda: messagebox.showinfo("关于","作者:Fishing\n版本:3.1\n说明:增强版多线程下载器,支持断点续传和多线程下载。"))
self.filemenubar.add_command(label="日志显示",command=self.show_log)
self.menubar.add_cascade(label="文件", menu=self.filemenubar)
self.root.config(menu=self.menubar)
self.root.bind("<B1-Motion>",self.root.update())
def select_path(self):
path = filedialog.asksaveasfilename()
if path:
self.path_var.set(path)
def log(self, message):
with open("Download.log","a+",encoding="utf-8") as f :
f.write("\n")
f.write(message)
def destory_log(self) :
self.is_logging=False
self.logwin.destroy()
def clean_log(self) :
with open("Download.log","w",encoding="utf-8") as f :
pass
self.refresh_log()
def refresh_log(self) :
with open("Download.log","r+",encoding="utf-8") as f :
self.log_area.delete("1.0",END)
str=f.read()
self.log_area.insert("1.0",str)
def show_log(self) :
if (self.is_logging) : return
self.is_logging=True
self.logwin=ttk.Toplevel(self.root)
self.logwin.title("日志")
self.log_area = ScrolledText(self.logwin, height=12)
self.log_area.grid(row=0, column=0, columnspan=4)
with open("Download.log","r",encoding="utf-8") as f :
str=f.read()
self.log_area.delete("1.0",END)
self.log_area.insert("1.0",str)
ttk.Button(self.logwin,text="刷新日志").grid(row=1,column=0)
ttk.Button(self.logwin,text="清空日志",command=self.clean_log).grid(row=1,column=3)
self.logwin.protocol("WM_DELETE_WINDOW", self.destory_log)
def format_speed(self, bytes_per_sec):
units = ['B/s', 'KB/s', 'MB/s']
unit_idx = 0
while bytes_per_sec >= 1024 and unit_idx < 2:
bytes_per_sec /= 1024
unit_idx += 1
return f"{bytes_per_sec:.2f} {units[unit_idx]}"
def download_chunk(self, url, start, end, part_num):
self.log("线程-%d已分配%d-%d字节,正在下载。" % (part_num,start,end))
try:
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(url, headers=headers, stream=True)
temp_file = self.temp_dir / f"part_{part_num}"
now_id=start
with open(temp_file, 'wb') as f:
for chunk in response.iter_content(chunk_size=self.chunk_size) :
if chunk :
f.write(chunk)
self.download_queue.put(len(chunk))
self.downloaded_total+=len(chunk)
now_id+=len(chunk)
self.root.update()
self.log("线程-%d已完成!" % part_num)
except Exception as e:
self.log(f"线程{part_num}下载失败: {str(e)}")
self.download_queue.put(0) # 确保监控继续
time.sleep(0.5)
if (messagebox.askyesno("线程-%dbomb,是否重试?" % part_num)) :
self.log("线程-%d已重试!" % part_num)
self.download_chunk(self,url,now_id,end,part_num)
def start_download(self) :
self.ltx=0
def update_shower(self) :
x=self.downloaded_total/self.file_size*400
y=self.downloaded_total/(time.time()-self.download_time)/(50*1024*1024)*200
#print(self.downloaded_total,self.file_size)
self.shower.create_rectangle(self.ltx,200-y,x,200,fill="green",outline="green")
self.shower.update()
self.ltx=x
if (self.is_downloading) :
self.root.after(1,lambda : update_shower(self))
def update_time(self) :
if (not self.is_downloading) : return
eta_sec=int((self.file_size-self.downloaded_total)/max(self.downloaded_total/max(time.time()-self.download_time,1),1))
self.eta['text']="剩余时间:"+("%02d" % (eta_sec//3600))+":"+("%02d" % (eta_sec//60%60))+":"+("%02d" % (eta_sec%60))
if (self.is_downloading) :
self.root.after(1000,lambda : update_time(self))
def update_window(self,event) :
self.nwin.attributes("-alpha", 0.9)
self.nwin.update()
self.root.update()
def start_shower(self) :
self.nwin=ttk.Toplevel(self.root)
self.nwin.title("Progress")
self.nwin.protocol("WM_DELETE_WINDOW", self.on_close_nwin)
self.eta=ttk.Label(self.nwin)
self.eta.pack()
self.shower=Canvas(self.nwin,width=400,height=200)
self.shower.pack()
self.nwin.bind("<B1-Motion>",lambda event : update_window(self,event))
self.root.after(100,lambda : update_shower(self))
self.root.after(100,lambda : update_time(self))
if self.is_downloading:
messagebox.showwarning("警告", "当前已有下载任务在进行")
return
try:
# 清理临时文件
self.temp_dir.mkdir(exist_ok=True)
for temp_file in self.temp_dir.glob("part_*"):
try:
temp_file.unlink()
except:
pass
# 获取输入参数
url = self.url_entry.get().strip()
save_path = self.path_var.get().strip()
if not url or not save_path:
raise ValueError("URL和保存路径不能为空")
try:
num_threads = int(self.thread_spin.get())
if not 1 <= num_threads <= 256 :
raise ValueError
except:
raise ValueError("线程数必须是1-256的整数")
# 初始化状态
self.is_downloading = True
self.progress = 0
self.downloaded_bytes = 0
self.last_update = time.time()
self.progress_bar['value'] = 0
self.speed_label.config(text="准备中 |")
self.downloaded_total=0
self.download_time=time.time()
self.log("\n=== 开始新下载任务 ===")
# 获取文件信息
response = requests.head(url)
if not response.ok:
raise ConnectionError("无法连接服务器")
self.file_size = int(response.headers.get('content-length', 0))
if self.file_size == 0:
raise ValueError("无效的文件大小")
# 配置进度条
self.progress_bar.config(maximum=self.file_size)
self.root.after(100,lambda : start_shower(self))
# 启动下载线程
chunk_size = self.file_size // num_threads
threads = []
for i in range(num_threads) : # | / - \
if (i%4==0) :
self.speed_label.config(text="准备中 |")
elif (i%4==1) :
self.speed_label.config(text="准备中 \\")
elif (i%4==2) :
self.speed_label.config(text="准备中 -")
else :
self.speed_label.config(text="准备中 /")
#self.root.update()
start = i * chunk_size
end = start + chunk_size -1 if i < num_threads-1 else self.file_size
thread = threading.Thread(
target=self.download_chunk,
args=(url, start, end, i),
daemon=True
)
thread.start()
threads.append(thread)
self.speed_label.config(text="下载中")
self.root.update()
# 启动监控
self.monitor_progress(threads, save_path)
except Exception as e:
self.log(f"错误: {str(e)}")
self.is_downloading = False
def monitor_progress(self, threads, save_path):
def update():
# 更新速度显示
now = time.time()
time_diff = now - self.last_update
if time_diff >= 1:
self.speed = self.downloaded_bytes / time_diff
self.speed_label.config(text=f"速度: {self.format_speed(self.speed)}")
self.last_update = now
self.downloaded_bytes = 0
# 处理下载数据
while not self.download_queue.empty():
chunk_size = self.download_queue.get()
self.progress += chunk_size
self.downloaded_bytes += chunk_size
self.progress_bar['value'] = self.progress
# 检查线程状态
if any(t.is_alive() for t in threads):
self.root.after(100, update)
else:
self.nwin.destroy()
messagebox.showinfo("sb","下载完成!文件已保存到:" + save_path)
try:
self.merge_files(save_path)
self.log("下载完成!文件已保存到:" + save_path)
except Exception as e:
self.log(f"文件合并失败: {str(e)}")
self.log("临时文件保留在: " + str(self.temp_dir))
finally:
self.is_downloading = False
self.speed_label.config(text="<Waiting>")
self.root.update()
self.root.after(100, update)
def merge_files(self, save_path):
try:
print(114514)
with open(save_path, 'wb') as output:
parts = sorted(self.temp_dir.glob("part_*"),
key=lambda x: int(x.name.split("_")[1]))
for part in parts:
with open(part, 'rb') as f:
output.write(f.read())
part.unlink()
except PermissionError:
raise RuntimeError("文件写入失败,请检查路径权限")
except Exception as e:
raise RuntimeError(f"文件合并错误: {str(e)}")
if __name__ == "__main__":
app = EnhancedDownloader()