资讯详情

UI 自动化测试实战(二)| 测试数据的数据驱动

WebSocket 在单个 TCP 全双工通信连接(Full Duplex 是通信传输的术语。通信允许数据在两个方向同时传输,相当于两种单工通信方式的结合。全双工指可同时(瞬时)双向传输信号( A→B 且 B→A )。指 A→B 的同时 B→A,协议为瞬时同步)。

https://ceshiren.com/uploads/default/original/3X/5/3/53ba29ea869c95aa63c329550c8cc8e9f5b723a7.png) 数据驱动是通过数据变化驱动自动化测试的执行,最终导致测试结果的变化。简单地说,它是参数化在自动化测试中的应用。 数据驱动在测试过程中的优点主要体现在以下几点: 1.提高代码重用率。相同的测试逻辑可以通过编写一个测试用例来重用多个测试数据,提高了测试代码的重用率和编写效率。 2.异常调查效率高。根据测试数据,每个数据生成一个测试用例,用例执行过程相互隔离。如果其中一个失败,则不会影响其他测试用例。 3.代码可维护性高,清晰的测试框架有利于其他测试工程师阅读,提高代码可维护性。 数据量小的测试用例可以使用代码的参数来实现数据驱动。当数据量大时,建议使用结构化文件(例如 YAML,JSON 等)存储数据,然后在测试用例中读取数据。 Pytest 提供了 @pytest.mark.parametrize 装饰器来进行参数化,可以使用参数化来实现数据驱动。代码如下: 首先使用上述代码 @pytest.mark.parametrize 装饰,传递两组数据,测试结果显示有两个测试用例被执行,而不是一个测试用例。也就是说, Pytest 两组测试数据将自动生成两个相应的测试用例并执行,并生成两个测试结果。 当测试数据量较大时,可以考虑将数据存储在结构化文件中。从文件中读取代码中所需格式的数据,并将其传输到测试方法中。建议您在这里使用它 YAML 存储测试数据的类型文件。YAML 以数据为中心,采用动态字段进行结构化 Excel、CSV、JSON、XML 等等更适合数据驱动。 下面,我们将上述两组参数数据存储到 YAML 在文件中创建一个 data/searchdata.yml 文件,代码如下: 上面的代码定义了一个代码 yaml 格式数据文件 searchdata.yml ,一个列表定义在文件中,列表中有两组数据,最终生成这样的数据格式:[[alibaba”, “BABA”, 200],[“JD”, “JD”, 20]] 。 下一步将测试用例中的参数数据从 searchdata.yml 读取文件,代码如下: 上述代码只需使用 yaml.safe_load() 方法,法 searchdata.yml 文件中的数据分别传输到用例中 test_search() 数据输入和结果据输入和结果的验证。而如果使用 Excel、CSV 数据存储需要从文件格式开始 Excel 读取文件中的数据,然后分析成所需的格式。而使用 YAML 这个过程完全省略了。 以上是关于其他链接的数据驱动,在后续章节中共享。 相关文章

  • UI 实战自动化测试(1) | 测试框架设计与 PageObject 改造

原文链接

获取更多技术文章分享 [外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-Lffj2C4J-1650247773345)(WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC7936 补充规范。WebSocket API (WebSocket API 是一个使用WebSocket 协议协议界面,建立全双工通道收发消息) 也被 W3C 定为标准。

而 HTTP 虽然协议不支持长期连接,但是 HTTP1.1 改进中,有一个 keep-alive,在一个 HTTP 多个连接可以在连接中发送 Request,接收多个 Response。

但是在 HTTP 中 Request = Response 永远是成立的,也就是说,一个 request 只能有一个response。而且这个response也是被动的,不能主动启动。

websocket 常用于社交/订阅、多玩家游戏、协作办公/编辑、股市基金报价、体育现场播放、音视频聊天/视频会议/在线教育、智能家居及基于位置的应用。

websocket 不能使用接口 requests 直接调用接口可以依靠第三方库实现调用。以下内容介绍如何调用第三方库实现 websocket 接口自动化测试。

实战

使用 python 语言实现 websocket 接口自动化

环境准备

1.安装 pyhton3 环境下载所需的运行库 2.下载所需的操作库 pip install websocket-client

实战演示

  • 连接 websoket 服务器
import logging from websocket import create_connection logger = logging.getLogger(__name__) url = 'ws://echo.websocket.org/' #在线回环websocket接口,必须以websocket连接后访问的方式不能直接在网页端输入地址访问 wss = create_connection(url, timeout=timeout)  
  • 发送 websocket 消息
  • wss.send(‘Hello World’)
 - 接收 websocket 消息 - ``` - res = wss.recv() - logger.info(res) 
  • 关闭 websocket 连接
  • wss.close()
  - websocket 第三方库的调用不支持直接发送除字符串以外的其他数据类型,因此在发送请求之前需要 Python 结构化格式转换为字符串类型或 json 字符串后,再发起 websocket 的接口请求 - ``` - #待发送的数据体格式为: - data= { -     "a" : "abcd", -     "b" : 123 -     } - # 发送前需要处理数据 json 字符串 - new_data=json.dumps(data,ensure_ascii=False) - wss.send(new_data) 
  格式转换处理响应内容:  

def load_json(base_str): if isinstance(base_str, str): try: res = json.loads(base_str) return load_json(res) except JSONDecodeError: return base_str elif isinstance(base_str, list): res = [] for i in base_str: res.append(load_json(i)) return res elif isinstance(base_str, dict): for key, value in base_str.items(): base_str[key] = load_json(value) return base_str return base_str

 - websocket 接口自动化测试,二次封装 demo 展示 - web_socket_util.py 封装 websocket 一般接口操作: - ``` - import logging - import json - from websocket import create_connection - logger = logging.getLogger(__name__) - class WebsocketUtil():     def conn(self, uri, timeout=3):             '''                     连接web服务器                             :param uri: 服务的url                                     :param timeout: 超时时间                                             :return:                                                     '''                                                             self.wss = create_connection(uri, timeut=timeout)
    def send(self, message):
            '''
                    发送请求数据体
                            :param message: 待发送的数据信息
                                    :return:
                                            '''
                                                    if not isinstance(message, str):
                                                                message = json.dumps(message)
                                                                        return self.wss.send(message)
    def load_json(self, base_str):
            '''
                    进行数据体的处理
                            :param base_str: 待处理的数据体
                                    :return:
                                            '''
                                                    if isinstance(base_str, str):
                                                                try:
                                                                                res = json.loads(base_str)
                                                                                                return self.load_json(res)
                                                                                                            except JSONDecodeError:
                                                                                                                            return base_str
                                                                                                                                    elif isinstance(base_str, list):
                                                                                                                                                res = []
                                                                                                                                                            for i in base_str:
                                                                                                                                                                            res.append(self.load_json(i))
                                                                                                                                                                                        return res
                                                                                                                                                                                                elif isinstance(base_str, dict):
                                                                                                                                                                                                            for key, value in base_str.items():
                                                                                                                                                                                                                            base_str[key] = self.load_json(value)
                                                                                                                                                                                                                                        return base_str
                                                                                                                                                                                                                                                return base_str
    def recv(self, timeout=3):
            '''
                    接收数据体信息,并调用数据体处理方法处理响应体
                            :param timeout: 超时时间
                                    :return:
                                            '''
                                                    if isinstance(timeout, dict):
                                                                timeout = timeout["timeout"]
                                                                        try:
                                                                                    self.settimeout(timeout)
                                                                                                recv_json = self.wss.recv()
                                                                                                            all_json_recv = self.load_json(recv_json)
                                                                                                                        self._set_response(all_json_recv)
                                                                                                                                    return all_json_recv
                                                                                                                                            except WebSocketTimeoutException:
                                                                                                                                                        logger.error(f"已经超过{timeout}秒没有接收数据啦")
    def settimeout(self, timeout):
            '''
                    设置超时时间
                            :param timeout: 超时时间
                                    :return:
                                            '''
                                                    self.wss.settimeout(timeout)
    def recv_all(self, timeout=3):
            '''
                    接收多个数据体信息,并调用数据体处理方法处理响应体
                            :param timeout: 超时时间
                                    :return:
                                            '''
                                                    if isinstance(timeout, dict):
                                                                timeout = timeout["timeout"]
                                                                        recv_list = []
                                                                                while True:
                                                                                            try:
                                                                                                            self.settimeout(timeout)
                                                                                                                            recv_json = self.wss.recv()
                                                                                                                                            all_json_recv = self.load_json(recv_json)
                                                                                                                                                            recv_list.append(all_json_recv)
                                                                                                                                                                            logger.info(f"all::::: {all_json_recv}")
                                                                                                                                                                                        except WebSocketTimeoutException:
                                                                                                                                                                                                        logger.error(f"已经超过{timeout}秒没有接收数据啦")
                                                                                                                                                                                                                        break
                                                                                                                                                                                                                                self._set_response(recv_list)
                                                                                                                                                                                                                                        return recv_list
    def close(self):
            '''
                    关闭连接
                            :return:
                                    '''
                                            return self.wss.close()
    def _set_response(self, response):
            self.response = response
    def _get_response(self) -> list:
            return self.response

test_case.py websocket 接口自动化测试用例:

class TestWsDemo:

    def setup(self):
            url = 'ws://echo.websocket.org/'
                    self.wss = WebsocketUtil()
                            self.wss.conn(url)
    def teardown(self):
            self.wss.close()
    def test_demo(self):
            data = {"a": "hello", "b": "world"}
                    self.wss.send(data)
                            res = self.wss.recv()
                                    assert 'hello' == res['a']

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

⬇️ 点击“阅读原文”,提升测试核心竞争力! 原文链接

获取更多技术文章分享

标签: 隔离变送器ws1521

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台