相关文章推荐
知识渊博的热带鱼  ·  在 Power BI Desktop ...·  2 周前    · 
开朗的领带  ·  必学:变分法与有限元法_泛函·  2 年前    · 
风度翩翩的豆腐  ·  利用虚假交易“薅羊毛” ...·  2 年前    · 
风流的啄木鸟  ·  《快把我哥带走》电影观后感 - 知乎·  2 年前    · 
含蓄的火柴  ·  北京大学法学院学工网·  2 年前    · 
从容的绿豆  ·  贪污7000万的95后 ...·  2 年前    · 
Code  ›  python 分隔列队机制完美解决TCP粘包\分包问题开发者社区
python算法 socket python继承 分隔符
https://cloud.tencent.com/developer/article/2213485
深情的围巾
2 年前
zmh-program
0 篇文章

python 分隔列队机制完美解决TCP粘包\分包问题

前往专栏
腾讯云
开发者社区
文档 意见反馈 控制台
首页
学习
活动
专区
工具
TVP
最新优惠活动
文章/答案/技术大牛
发布
首页
学习
活动
专区
工具
TVP 最新优惠活动
返回腾讯云官网
zmh-program
首页
学习
活动
专区
工具
TVP 最新优惠活动
返回腾讯云官网
社区首页 > 专栏 > 信息技术博客 > python 分隔列队机制完美解决TCP粘包\分包问题

python 分隔列队机制完美解决TCP粘包\分包问题

作者头像
zmh-program
发布 于 2023-02-06 10:00:40
305 0
发布 于 2023-02-06 10:00:40
举报

通常, TCP接收为210字节(1024 bytes, 213bits), 包括了4 字节的消息头和 1020字节的消息. 那如果超出这个范围呢? 分多次发, python提供了一个接口 python.socket.sendall(bytes) 。 在套接字中, 由于 TCP的优化 Nagle算法机制 或者 接受最大值(MSS) < 应接收的值 ,出现 粘包 , 分包 现象 将多次间隔较小、数据量较小的数据,合并成一个数据量大的数据块,然后进行封包。那么这样一来,接收端就必须使用高效科学的拆包机制来分辨这些数据。 (如图1)

请添加图片描述
请添加图片描述

解决此问题, 可以将发送的内容转换repr (‘something\n’ -> ‘something\n"’), 并添加分隔符. 解析的时候, 通过分割分隔符, 组成列队 Queue , 先出后进. 如果其中有分隔符, 那么其中的内容进入 ReadyQueue ,如果分割最后无分隔符, 则进入等待 WaitKey , 等下次分割出的第一个相结合, 进入 ReadyQueue . (如图2)

情况一 接收 "data1\n" 这是一个完整的数据包, 分割出来 ["data1", ""] 会将 "data1" 与前面 waitKey 将(初始化为 "" )结合进入列队. 并将最后面的 "" 设为 waitKey

情况二 接收 ata2 这是分包导致的,分割得出 ["ata2"] 将其与前面 waitKey 结合,不进入列队,等待分隔符。将下一次含有分隔符前端的数据结合进入列队。

情况三 接收 \ndata3\ndata4\nd 这是粘包导致的后面还加带了一点点数据 分割得出 ["", "data3", "data4", "d"]

前面的 waitKey 与第一个空字符结合,进入列队 带data3与data4进入列队 waitKey 设为第四个"d" .

请添加图片描述
请添加图片描述

怎么用python解决呢?

导入库 socket, threading

文章目录

  • 导入
  • | SocketHandler类
        • ·初始化
        • ·接收以及异常处理
    • △ 解析(重点)
  • | 测试
  • | 封装

导入

import socket
from threading import Thread

| SocketHandler类

class SocketHandler(object): #由于 accept为被动接受, 所以不继承 socket.socket
	split_text = "\n"  # 类变量, 默认分隔符为回车(\n)
·初始化
def to_thread(target, Daemon=True) -> callable:
    def run(*args, name=str()) -> Thread:
        thread = Thread(target=target, args=args)
        thread.setDaemon(Daemon)
        if name:
            thread.setName(name)
        return thread
    return run
class SocketHandler(object):
# ...
    def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8", run=True):
        self.socket, self.bufsize, self.codec = socket, bufsize, codec
        self.waitKey = str()
        self.ReadyQueue = []
        self._closed = False
        if run:
            self.run()
    def run(self):
        self.forever_receive(name=f"客户端{socket}").start()
    @to_thread
    def forever_receive(self) -> (str, None):
        while self.isOpen():
            data = self.__recv()
            if isinstance(data, bytes) and data:
                self.parse_data(self.handle(data))
                continue
            elif data is ConnectionError:
                return
·接收以及异常处理
def ignore(function):
    def func(*args, **kwargs):
            return function(*args, **kwargs)
        except:
    return func
class SocketHandler(object):
# ...
    def __del__(self):
        self.quit()
    def isOpen(self) -> bool:
        return not (self._closed and getattr(self.socket, "_closed", False))
    def quitEvent(self) -> None:
    def quit(self) -> None:
        self._closed = True
        self.quitEvent()
        self.socket.close()
    def __recv(self) -> (bytes, ConnectionError):
            return self.socket.recv(self.bufsize).strip(b" ")  # str.strip()不可用! 会将\n省略
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return ConnectionError
    def __send(self, data: bytes) -> bool:
            self.socket.sendall(data)
            return True
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return False
    def send(self, data) -> bool:
        if isinstance(data, str):
            data = data.encode(self.codec)
        elif isinstance(data, (set, list, tuple)):
            data = repr(data)
        elif isinstance(data, (int, float)):
            data = str(data).encode(self.codec)
        elif isinstance(data, bytes):
        else:
            data = bytes(data)
        return self.__send(data + self.split_text.encode(self.codec))
    @ignore
    def connect(self, host: str, port: int):
        assert 0 <= port <= (2 ** 16) - 1
        self.socket.connect((host, port))

△ 解析(重点)

    def handle(self, data: bytes):
        return [d.strip() for d in data.decode(self.codec).split(self.split_text)]
    @ignore  # assert bool(list)
    def parse_data(self, generator: (tuple, list, set)) -> None:
        generator = list(generator)
        if len(generator) == 1:  # 列表为1, 表明无间隔符, 则在等待中添加. 
            self.waitKey += generator[0]
            return
        self.ReadyQueue.append(self.waitKey + generator.pop(0)) #将原先的等待值
        self.waitKey = generator.pop()
        self.ReadyQueue.extend(generator)
    def recv(self) -> str:
        while not self.ReadyQueue:
        return self.ReadyQueue.pop(0)
    def recv_list(self) -> list:
        queue = self.ReadyQueue[:]
        self.ReadyQueue = []
        return queue

| 测试

import time
class Debugger:
    addr = ("127.0.0.1", 429)
    s = socket.socket()
    s.bind(addr)
    s.listen(10)
    user = SocketHandler(bufsize=20, run=False)
    user.connect(*addr)
    server = SocketHandler(s.accept()[0], run=False)
    user.run()
    def __init__(self):
        self.IO()
    def IO(self):
        while True:
            self.server.send(time.time())
            self.server.send(time.time())
            print(self.user.recv_list())
            time.sleep(3)
debug = Debugger()
debug.IO()

| 封装

import socket
def ignore(function):
    def func(*args, **kwargs):
            return function(*args, **kwargs)
        except:
    return func
class SocketHandler(object):
    split_text = "\n"  # 类变量, 默认分隔符为回车(\n)
    def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8", run=True):
        self.socket, self.bufsize, self.codec = socket, bufsize, codec
        self.waitKey = str()
        self.ReadyQueue = []
        self._closed = False
        if run:
            self.forever_receive()
    def __del__(self):
        self.quit()
    def isOpen(self) -> bool:
        return not (self._closed and getattr(self.socket, "_closed", False))
    def quitEvent(self) -> None:
    def quit(self) -> None:
        self._closed = True
        self.quitEvent()
        self.socket.close()
    def __recv(self) -> (bytes, ConnectionError):
            return self.socket.recv(self.bufsize).strip(b" ")  # str.strip()不可用! 会将\n省略
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return ConnectionError
    def __send(self, data: bytes) -> bool:
            self.socket.sendall(data)
            return True
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return False
    def send(self, data) -> bool:
        if isinstance(data, str):
            data = data.encode(self.codec)
        elif isinstance(data, (set, list, tuple)):
            data = repr(data)
        elif isinstance(data, (int, float)):
            data = str(data).encode(self.codec)
        elif isinstance(data, bytes):
        else:
            data = bytes(data)
        return self.__send(data + self.split_text.encode(self.codec))
    def forever_receive(self) -> (str, None):
        while self.isOpen():
            self.receive_datas()
    def receive_datas(self) -> bool:
        data = self.__recv()
        if isinstance(data, bytes) and data:
            self.parse_data(self.handle(data))
            return True
        elif data is ConnectionError:
            return False
    def handle(self, data: bytes):
        return [d.strip() for d in data.decode(self.codec).split(self.split_text)]
    @ignore  
    def parse_data(self, generator: (tuple, list, set)) -> None:
        generator = list(generator)
        if len(generator) == 1:  # 列表为1, 表明无间隔符, 则在等待中添加.
            self.waitKey += generator[0]
            return
        self.ReadyQueue.append(self.waitKey + generator.pop(0))
        self.waitKey = generator.pop()
        self.ReadyQueue.extend(generator)
    def recv(self) -> str:
        while not self.ReadyQueue:
            self.receive_datas()
        return self.ReadyQueue.pop(0)
    def recv_list(self) -> list:
        queue = self.ReadyQueue[:]
        self.ReadyQueue = []
 
推荐文章
知识渊博的热带鱼  ·  在 Power BI Desktop 中使用自訂格式字串 - Power BI | Microsoft Learn
2 周前
开朗的领带  ·  必学:变分法与有限元法_泛函
2 年前
风度翩翩的豆腐  ·  利用虚假交易“薅羊毛” 男子涉诈骗罪获刑八个月-中国法院网
2 年前
风流的啄木鸟  ·  《快把我哥带走》电影观后感 - 知乎
2 年前
含蓄的火柴  ·  北京大学法学院学工网
2 年前
从容的绿豆  ·  贪污7000万的95后 买了一张纯金的“青眼白龙”被竟拍到8700万!_手机新浪网
2 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号