TCP聊天 传输文件服务器服务器套接字v2.1
所有版本记录:
v1.0
: TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶v1.1
: python TCP套接字服务器v1.1-修改新服务端命令功能bug(socket PyQt5)v1.2
: python TCP服务器v1.2 - 服务端新用户登录注册(json, md5加密)v1.3
: python TCP服务器v1.3 - 服务器抗压试验及关闭套接字处理v1.4
: python TCP服务器v1.4 - 处理客户端连接服务器异常(异常情况分类)v1.5
: PyQt下拉框可编辑(comboBox):editable - python TCP服务器v1.5 - 增加客户端连接界面的自定义参数(设置加时, 可选连接地址)v1.6
: Python TCP服务器v1.6 - multiprocessing多进程及Ctrl-c(SIGINT)退出v1.7
: Python TCP服务器v1.7 - PyQt5 server服务端来临v1.8
: python TCP服务器v1.8 - PyQt5登录界面美化 淡入淡出v1.9
: socketTCP协程文件 信息传递 - TCP聊天文件服务器v1.9 - 划时代版本更新(4.6万字)v2.0
: TCP聊天文件服务器v2.0 - 重大bug修复 PyQt5文件传输可视化
虽说v2.0
里面有细微的线程管理, 但我觉得还不够.
文章目录
- 分析线程信息
- 改变线程运行函数
- 将线程管理结果转移到listWidget
- 服务端代码
引用
from threading import enumerate as _enumerate #预防和内置函数enumerate(iterable)混淆替代
分析线程信息
def thread_info(thread:Thread): return f"{
str(thread._name).ljust(12)}{
thread._ident}({
thread.__class__.__name__})"
改动线程运行函数
为此, 我改进了从v1.2
线程运行函数的开始
def threading(Daemon,name=None, **kwargs): thread = Thread(**kwargs) thread.setDaemon(Daemon) if name: thread.setName(name) thread.start() return thread
改变了四点
file_thread = threading(True, "文件传输", target=loop.run_forever)
class Server():
def run(self):
# ...
threading(Daemon=True,name="离线清理",target=self.clear_socket)
return threading(Daemon=True,name="监听端口", target=self.accept_client)
class Client():
def run(self):
self.thread = threading(True,name=f"客户端{
self.addr}",target=self.forever_receive)
把线程管理结果转到listWidget
class Interface(QtWidgets.QMainWindow):
def setupUi(self):
self.listView_2 = QtWidgets.QListWidget(self.groupBox_2)
self.listView_2.setObjectName("listView_2")
# ...
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.get_threadings)
self.timer.start(500)
def get_threadings(self):
_e = _enumerate()
if hasattr(self, "data") and self.data == _e:
return
self.data = _e
self.listView_2.clear()
self.listView_2.addItems(map(thread_info, self.data))
服务端代码
好了, 这就结束了
import os, socket, sys, time, logging
import data #同目录data.py
from psutil import cpu_percent
from PyQt5 import QtCore, QtGui, QtWidgets
#from signal import SIGINT, signal
import asyncio
import os
import sys
from threading import Thread
from threading import enumerate as _enumerate #防止与enumerate(iterable)混淆替代
__version__ = 2.1
base = 1024
segment = base * 2 # 防止切断
delay = 0.04
new_file = 0
update_file = 1
request_file = 2
normal_text = 3
loop = asyncio.get_event_loop()
class QLogger(logging.Handler):
def __init__(self, *args, **kwargs):
logging.Handler.__init__(self, *args, **kwargs)
self.setFormatter(logging.Formatter("[<font color='darkgreen'>%(asctime)s</font>(<font color='blue'>%(levelname)s</font>)]: <font color='brown'>%(message)s</font>"))
def emit(self, record):
record = self.format(record)
if hasattr(self, "output"):
self.output(record)
def connect(self, func):
if callable(func):
self.output = func
def threading(Daemon,name=None, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
if name:
thread.setName(name)
thread.start()
return thread
file_thread = threading(True, "文件传输", target=loop.run_forever)
def thread_info(thread:Thread):
return f"{
str(thread._name).ljust(12)}{
thread._ident}({
thread.__class__.__name__})"
def ignore(function):
def i(*args, **kwargs):
try:
function(*args, **kwargs)
except:
return
return i
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.DEBUG)
Qlog = QLogger()
logger.addHandler(Qlog)
filehandle = logging.FileHandler("log.txt")
filehandle.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(filehandle)
logger.setLevel(logging.DEBUG)
bytecount = 1024
def to_logging(command):
def logs(*args, **kwargs):
try:
_result = command(*args, **kwargs)
if _result is None:
return True
return _result
except:
logger.exception(str())
return False
return logs
class Command_Handler(object):
def __init__(self, bind):
"""Bind Client class"""
assert isinstance(bind, Client)
self.client = bind
def _function(self, _list):
data = {
"/info": {
"-v": self.get_version(),
"-id": self.get_id(),
"-i": self.info(),
"-h": self.help(),
"-name": self.name()},
}
_dict = data
for n in range(len(_list)):
if type(_dict) == dict:
_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
else:
break
if type(_dict) == dict:
_dict = "Error:\n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(
_dict.keys())
return _dict
@staticmethod
def help():
return """/info [-v] [-id] [-i] -v : get version of program. -id : get your id. -i : get information. -h : help. -name : get your name For example, <font color=red>/info -id</font>"""
@staticmethod
def get_version():
return "version : " + str(__version__)
def get_id(self):
return "Your id is {}.".format(id(self.client))
def name(self):
return "Your name is {}.".format(self.client.username)
def info(self):
return f"Socket Server[version {
self.get_version()}] By zmh."
def unknown(self, s):
return """Error: No command named "%s". Please search [/info -h] to help. %s""" % (s, self.help())
def cut(self, string):
return string.strip().split()
def handler(self, c):
return "<font color='gray'>[command]</font><font color='brown'>%s</font>\n%s" % (
c, str(self._function(self.cut(c))))
def iscommand(self, i):
return i.strip().startswith("/")
class Server():
join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
user_message = "<font color='%s'>%s(%s)%s></font> %s"
quit_message = "%s(%s) 下线了, %s"
def __init__(self, usernumUpdate=lambda _:None):
self.user_num_change = usernumUpdate
def Setup(self, addr, port, backlog=10, max_count=bytecount**2, encode='utf8'):
self.user_handle = message_handle(self)
self.address = addr, port
self.backlog = backlog
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.address)
self.socket.listen(backlog)
self.max_count = max_count
self.connect = []
self.encode = encode
self.user_record = data.user()
return self.run()
def clear_socket(self, clear_ms = 500):
logger.info(f"Clear the closed socket once every {
clear_ms} ms.")
while True:
del_list = list(filter(lambda c: hasattr(c, 'Quitted') or (not c.isOpen()), self.connect))
for user in del_list:
self.connect.remove(user)
#if del_list:
# logger.info(f"Clear the closed client socket, number is {len(del_list)}.")
#else:
# logger.info('None of the sockets have been cleaned.')
time.sleep(clear_ms / 1000)
def run(self):
logger.debug(f"Server [{
':'.join(map(str, self.address))}] on.")
logger.info(f"pid {
os.getpid()}.")
logger.info(f"Max receive length {
covert(self.max_count, fine=True)}.")
logger.info(f"Single file transfer speed: <font color='blue'>{
covert(segment * (1 // delay))}/s<font>({
covert(segment)} × {
int(1 //delay)})")
gui.Database_signal.emit("<font color='gray'>[Transfer speed[-SEGMENT]] = [Maximum load] ÷ 2.</font>")
logger.info("Backlog number: " + str(self.backlog))
logger.info('The CODEC is sent as ' + self.encode)
threading(Daemon=True,name="离线清理",target=self.clear_socket)
return threading(Daemon=True,name="监听端口", target=self.accept_client)
def _get_Clients(self) -> list:
def func(c):
return c.__filter__()
return list(filter(func, self.connect))
def _get_sockets(self): # return int
i = len(self._get_Clients())
self.user_num_change(i)
return i
def _str_sockets(self):
return f"当前人数 {
self._get_sockets()}"
def ServerMessage(self, mes, inc=True):
for user in self._get_Clients():
if user.__filter__():
user._send(mes)
def UserMessage(self, address, _user, mes, inc=True):
if not mes:
return
for user in self.connect:
if user.__filter__():
username = user.username
send_message = Server.user_message % ("brown" if _user == username else "red",
_user,
address,
"(我自己)" if _user == username else "",
mes)
user._send(send_message)
logger.info(f"{
address}[{
_user}] : {
mes}")
def error_handle(self):
for user in filter(lambda c: not c.isOpen(), self.connect):
self.connect.remove(user)
def accept_client(self):
while True:
logger.info("The server is listening on the port.")
client, address = self.socket.accept() # 阻塞,等待客户端连接
NewClient = Client(client, address[0], self)
self.connect.append(NewClient)
NewClient.run()
logger.info(f'The address {
address[0]} is connected to the server')
def quit(self, username, address):
QuitMessage = Server.quit_message % (username, address, self._str_sockets())
logger.info(QuitMessage)
self.ServerMessage(QuitMessage, False)
def login(self, username, address):
logger.info(f"{
address}[{
username}] 登录服务器 , " + self._str_sockets())
self.ServerMessage(Server.join_message % (username, address, self._get_sockets()))
class Client(object):
class QuitError(Exception):
def __init__(self, *args):
super().__init__(*args)
def __init__(self, socket, addr, server: Server):
self.socket = socket
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.encode = self.server.encode
self.max_count = self.server.max_count
self.com = Command_Handler(self)
@self.error
def _recv(self) -> bytes:
return self.socket.recv(self.max_count).decode(encoding=self.encode).strip()
self._recv = lambda: _recv(self)