资讯详情

TCP聊天文件服务器v2.1 - 服务端线程管理(threading.enumerate)

TCP聊天 传输文件服务器服务器套接字v2.1 threading handle

所有版本记录: v1.0 : TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶 v1.1 : python TCP套接字服务器v1.1-修改新服务端命令功能bug(socket PyQt5) v1.2 : python TCP服务器v1.2 - 服务端新用户登录注册(json, md5加密) v1.3 : python TCP服务器v1.3 - 服务器抗压试验及关闭套接字处理 v1.4 : python TCP服务器v1.4 - 处理客户端连接服务器异常(异常情况分类) v1.5 : PyQt下拉框可编辑(comboBox):editable - python TCP服务器v1.5 - 增加客户端连接界面的自定义参数(设置加时, 可选连接地址) v1.6 : Python TCP服务器v1.6 - multiprocessing多进程及Ctrl-c(SIGINT)退出 v1.7 : Python TCP服务器v1.7 - PyQt5 server服务端来临 v1.8 : python TCP服务器v1.8 - PyQt5登录界面美化 淡入淡出 v1.9 : socketTCP协程文件 信息传递 - TCP聊天文件服务器v1.9 - 划时代版本更新(4.6万字) v2.0 : TCP聊天文件服务器v2.0 - 重大bug修复 PyQt5文件传输可视化

虽说v2.0里面有细微的线程管理, 但我觉得还不够.

文章目录

  • 分析线程信息
  • 改变线程运行函数
  • 将线程管理结果转移到listWidget
  • 服务端代码

引用

from threading import enumerate as _enumerate #预防和内置函数enumerate(iterable)混淆替代 

分析线程信息

def thread_info(thread:Thread):     return f"{ 
          str(thread._name).ljust(12)}{ 
          thread._ident}({ 
          thread.__class__.__name__})" 

改动线程运行函数

为此, 我改进了从v1.2线程运行函数的开始

def threading(Daemon,name=None, **kwargs):     thread = Thread(**kwargs)     thread.setDaemon(Daemon)     if name:         thread.setName(name)     thread.start()     return thread 

改变了四点

file_thread = threading(True, "文件传输", target=loop.run_forever)
class Server():
	def run(self):
		# ...
        threading(Daemon=True,name="离线清理",target=self.clear_socket)
        return threading(Daemon=True,name="监听端口", target=self.accept_client)
class Client():
    def run(self):
        self.thread = threading(True,name=f"客户端{ 
          self.addr}",target=self.forever_receive)

把线程管理结果转到listWidget

class Interface(QtWidgets.QMainWindow):
        def setupUi(self):
            self.listView_2 = QtWidgets.QListWidget(self.groupBox_2)
            self.listView_2.setObjectName("listView_2")
        	# ...
            self.timer = QtCore.QTimer(self)
            self.timer.timeout.connect(self.get_threadings)
            self.timer.start(500)
        def get_threadings(self):
            _e = _enumerate()
            if hasattr(self, "data") and self.data == _e:
                return
            self.data = _e
            self.listView_2.clear()
            self.listView_2.addItems(map(thread_info, self.data))

服务端代码

好了, 这就结束了

import os, socket, sys, time, logging
import data #同目录data.py
from psutil import cpu_percent
from PyQt5 import QtCore, QtGui, QtWidgets
#from signal import SIGINT, signal
import asyncio
import os
import sys
from threading import Thread
from threading import enumerate as _enumerate #防止与enumerate(iterable)混淆替代

__version__ = 2.1
base = 1024
segment = base * 2 # 防止切断
delay = 0.04

new_file = 0
update_file = 1
request_file = 2
normal_text = 3
loop = asyncio.get_event_loop()

class QLogger(logging.Handler):
    def __init__(self, *args, **kwargs):
        logging.Handler.__init__(self, *args, **kwargs)
        self.setFormatter(logging.Formatter("[<font color='darkgreen'>%(asctime)s</font>(<font color='blue'>%(levelname)s</font>)]: <font color='brown'>%(message)s</font>"))
    def emit(self, record):
        record = self.format(record)
        if hasattr(self, "output"):
            self.output(record)
    def connect(self, func):
        if callable(func):
            self.output = func

def threading(Daemon,name=None, **kwargs):
    thread = Thread(**kwargs)
    thread.setDaemon(Daemon)
    if name:
        thread.setName(name)
    thread.start()
    return thread
file_thread = threading(True, "文件传输", target=loop.run_forever)

def thread_info(thread:Thread):
    return f"{ 
          str(thread._name).ljust(12)}{ 
          thread._ident}({ 
          thread.__class__.__name__})"
def ignore(function):
    def i(*args, **kwargs):
        try:
            function(*args, **kwargs)
        except:
            return

    return i


logger = logging.getLogger(__name__)
logger.setLevel(level=logging.DEBUG)
Qlog = QLogger()
logger.addHandler(Qlog)
filehandle = logging.FileHandler("log.txt")
filehandle.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(filehandle)
logger.setLevel(logging.DEBUG)
bytecount = 1024

def to_logging(command):
    def logs(*args, **kwargs):
        try:
            _result = command(*args, **kwargs)
            if _result is None:
                return True
            return _result
        except:
            logger.exception(str())
            return False
    return logs
class Command_Handler(object):
    def __init__(self, bind):
        """Bind Client class"""
        assert isinstance(bind, Client)
        self.client = bind

    def _function(self, _list):

        data = { 
        "/info": { 
        "-v": self.get_version(),
                          "-id": self.get_id(),
                          "-i": self.info(),
                          "-h": self.help(),
                          "-name": self.name()},
                }
        _dict = data
        for n in range(len(_list)):
            if type(_dict) == dict:
                _dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
            else:
                break
        if type(_dict) == dict:
            _dict = "Error:\n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(
                _dict.keys())
        return _dict

    @staticmethod
    def help():
        return """/info [-v] [-id] [-i] -v : get version of program. -id : get your id. -i : get information. -h : help. -name : get your name For example, <font color=red>/info -id</font>"""

    @staticmethod
    def get_version():
        return "version : " + str(__version__)

    def get_id(self):
        return "Your id is {}.".format(id(self.client))

    def name(self):
        return "Your name is {}.".format(self.client.username)

    def info(self):
        return f"Socket Server[version { 
          self.get_version()}] By zmh."

    def unknown(self, s):
        return """Error: No command named "%s". Please search [/info -h] to help. %s""" % (s, self.help())

    def cut(self, string):
        return string.strip().split()

    def handler(self, c):
        return "<font color='gray'>[command]</font><font color='brown'>%s</font>\n%s" % (
        c, str(self._function(self.cut(c))))

    def iscommand(self, i):
        return i.strip().startswith("/")


class Server():
    join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
    user_message = "<font color='%s'>%s(%s)%s></font> %s"
    quit_message = "%s(%s) 下线了, %s"
    def __init__(self, usernumUpdate=lambda _:None):
        self.user_num_change = usernumUpdate
    def Setup(self, addr, port, backlog=10, max_count=bytecount**2, encode='utf8'):
        self.user_handle = message_handle(self)
        self.address = addr, port
        self.backlog = backlog
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind(self.address)
        self.socket.listen(backlog)
        self.max_count = max_count
        self.connect = []
        self.encode = encode
        self.user_record = data.user()
        return self.run()
    def clear_socket(self, clear_ms = 500):
        logger.info(f"Clear the closed socket once every { 
          clear_ms} ms.")
        while True:
            del_list = list(filter(lambda c: hasattr(c, 'Quitted') or (not c.isOpen()), self.connect))
            for user in del_list:
                self.connect.remove(user)
            #if del_list:
            # logger.info(f"Clear the closed client socket, number is {len(del_list)}.")
            #else:
            # logger.info('None of the sockets have been cleaned.')
            time.sleep(clear_ms / 1000)

    def run(self):
        logger.debug(f"Server [{ 
          ':'.join(map(str, self.address))}] on.")
        logger.info(f"pid { 
          os.getpid()}.")
        logger.info(f"Max receive length { 
          covert(self.max_count, fine=True)}.")
        logger.info(f"Single file transfer speed: <font color='blue'>{ 
          covert(segment * (1 // delay))}/s<font>({ 
          covert(segment)} × { 
          int(1 //delay)})")
        gui.Database_signal.emit("<font color='gray'>[Transfer speed[-SEGMENT]] = [Maximum load] ÷ 2.</font>")
        logger.info("Backlog number: " + str(self.backlog))
        logger.info('The CODEC is sent as ' + self.encode)
        threading(Daemon=True,name="离线清理",target=self.clear_socket)
        return threading(Daemon=True,name="监听端口", target=self.accept_client)

    def _get_Clients(self) -> list:
        def func(c):
            return c.__filter__()

        return list(filter(func, self.connect))

    def _get_sockets(self):  # return int
        i = len(self._get_Clients())
        self.user_num_change(i)
        return i

    def _str_sockets(self):
        return f"当前人数 { 
          self._get_sockets()}"

    def ServerMessage(self, mes, inc=True):
        for user in self._get_Clients():
            if user.__filter__():
                user._send(mes)

    def UserMessage(self, address, _user, mes, inc=True):
        if not mes:
            return
        for user in self.connect:
            if user.__filter__():
                username = user.username
                send_message = Server.user_message % ("brown" if _user == username else "red",
                                                      _user,
                                                      address,
                                                      "(我自己)" if _user == username else "",
                                                      mes)
                user._send(send_message)
        logger.info(f"{ 
          address}[{ 
          _user}] : { 
          mes}")

    def error_handle(self):
        for user in filter(lambda c: not c.isOpen(), self.connect):
            self.connect.remove(user)

    def accept_client(self):
        while True:
            logger.info("The server is listening on the port.")
            client, address = self.socket.accept()  # 阻塞,等待客户端连接
            NewClient = Client(client, address[0], self)
            self.connect.append(NewClient)
            NewClient.run()
            logger.info(f'The address { 
          address[0]} is connected to the server')

    def quit(self, username, address):
        QuitMessage = Server.quit_message % (username, address, self._str_sockets())
        logger.info(QuitMessage)
        self.ServerMessage(QuitMessage, False)

    def login(self, username, address):
        logger.info(f"{ 
          address}[{ 
          username}] 登录服务器 , " + self._str_sockets())
        self.ServerMessage(Server.join_message % (username, address, self._get_sockets()))


class Client(object):
    class QuitError(Exception):
        def __init__(self, *args):
            super().__init__(*args)

    def __init__(self, socket, addr, server: Server):
        self.socket = socket
        self.addr = addr
        if not isinstance(server, Server):
            raise ValueError
        self.server = server
        self.encode = self.server.encode
        self.max_count = self.server.max_count
        self.com = Command_Handler(self)

        @self.error
        def _recv(self) -> bytes:
            return self.socket.recv(self.max_count).decode(encoding=self.encode).strip()

        self._recv = lambda: _recv(self)

       

标签: zb2188板型电阻

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台