1.安pyDAL库
pip install pyDAL
2.pyDAL库的强大之处在于,它可以和各种数据库进行交互(不限制于mysql),而且代码很容易在不同的数据库之间移植。
连接每个数据库的方法如下:(在下表中,我们统一假设数据库登录的账户是username,登录密码为password,主机地址为(host address)localhost,登录后数据库的名字为test)。详见以下链接。
web2py - The database abstraction layer
具体使用见下面的例子
sqlite://storage.sqlite |
|
mysql://username:password@localhost/test?set_encoding=utf8mb4 |
|
postgres://username:password@localhost/test |
|
mssql://username:password@localhost/test |
|
mssql3://username:password@localhost/test |
|
mssql4://username:password@localhost/test |
|
firebird://username:password@localhost/test |
|
oracle://username/password@test |
|
db2://username:password@test |
|
ingres://username:password@localhost/test |
|
sybase://username:password@localhost/test |
|
informix://username:password@test |
|
teradata://DSN=dsn;UID=user;PWD=pass;DATABASE=test |
|
cubrid://username:password@localhost/test |
|
sapdb://username:password@localhost/test |
|
imap://user:password@server:port |
|
mongodb://username:password@localhost/test |
|
google:sql://project:instance/database |
|
google:datastore |
|
google:datastore ndb |
3.本文以实现数据库的交互:
首先连接MySQL下图使用数据库MySQL数据库数据库交互平台,这些平台都差不多,可以随意下载安装。在主机上搭建数据库平台server,然后其他电脑可以通过client访问客户端。我在这里用的是HeidiSQL客户端:(如下所示,我登录的账户是root,登录密码为*****,主机地址为192.168.0.45
登录后数据库的名称是pqlongtermtest,然后我在下面建立了很多table,
不要说太多,直接在代码上,代码中的每一步都会得到具体的解释。代码所做的主要事情是建立数据库表格,然后在运行自动化代码时将数据上传到表格,并可以自动生成每个表格的主键id,相互引用,即实现表与表的交互。
import re #正则表达式库 from datetime import datetime #时间库 import numpy as np #数学计算库 import pydal ##我们需要的Python万能库与数据库互动 import requests #接口库,访问URL用的 from robot.api import logger #便于直接logger打印信息 from robot.libraries.BuiltIn import BuiltIn #这里BuiltIn是Robot Framework的交互库 from selenium.webdriver.common.keys import Keys #selenium操作鼠标键盘的库 from selenium.webdriver.remote.webelement import WebElement #selenium获得元素位置的库 class PQDatabase(object): def __init__(self, username, password, host, dbname): self.conn = None #声明,用于连接MySQL self.deviceinfotab = 'device_information' ##相应数据库中新建的表 self.test_run_history = 'test_run_history' ##对应数据库中的其他新表 self.measurandtab = 'measurement_test_result_history' ##对应数据库中还有其他新建的表 self.open_database_connection(username, password, host, dbname) ##将连接数据库的函数放置到__init__调用这个PQDatabase类可以连接数据库 #region Basic Operation ''' 注意前面使用的例子MySQL连接方式为 mysql://username:password@localhost/test?set_encoding=utf8mb4 ''' def open_database_connection(self, username, password, host, dbname): if self.conn is None: self.conn = pydal.DAL('mysql://{!s}:{!s}:{!s}@{!s}/{!s}?set_encoding=utf8' .format(username, password, host, dbname), fake_migrate=True) ''' 用于上述连接fake_migrate=True, (详细说明也可以看上面的英文链接,在链接里面讲到: 当数据库中有两个重复行时,只有一个类型strig一个为datetime类型时是不能兼容的,
但fake_migrate=True可以重构使之不报错,仍然能正常连接上数据库);
大体作用是因为数据库的一些特性会导致链接出错,
用fake_migrate=True可以减少连接错误。
'''
def close_database_connection(self): # 断开连接
self.conn.close()
#endregion
#region create Table
'''
以下都是创建数据库中表格的方法,先开辟出一片田地Field,
相当于在数据库中建立了一张Excel表格用于后续数据库赋值存数据,
其中self.conn已经在__init__中调用的open_database_connection进行了赋值,
而pydal.DAL(...).define_table(para1,*para2)则是建表,
pydal.DAL(...)也即self.conn;参数para1为所建表的名称,参数*para2则为所建立表的column
'''
def create_device_info_table(self):
self.conn.define_table(
self.deviceinfotab,
pydal.Field('ip', type='string', length=50),
pydal.Field('device_type', type='string', length=50),
pydal.Field('serial_no', type='string', length=50),
pydal.Field('mlfb', type='string', length=50),
pydal.Field('mac', type='string', length=50),
pydal.Field('hw_version', type='string', length=50),
pydal.Field('sd_card_vendor', type='string', length=50),
)
def create_test_run_history_table(self):
self.conn.define_table(
self.test_run_history,
pydal.Field('device_information_id', type='integer', length=50),
pydal.Field('firmware_version', type='string', length=50),
pydal.Field('bootloader_version', type='string', length=50),
pydal.Field('parameter_set_version', type='string', length=50),
pydal.Field('firmware_package_version', type='string', length=50),
pydal.Field('test_start_time', type='datetime', length=50),
pydal.Field('signal_equipment', type='string', length=50),
pydal.Field('signal_id', type='string', length=50),
pydal.Field('network_type', type='string', length=50),
pydal.Field('u_din', type='double', length=50),
pydal.Field('i_din', type='double', length=50),
pydal.Field('f', type='string', length=50),
pydal.Field('pt', type='string', length=50),
pydal.Field('ct', type='string', length=50),
pydal.Field('case_version', type='string', length=50),
pydal.Field('uncertainty_version', type='string', length=50),
pydal.Field('audit_status', type='string', length=50),
pydal.Field('create_time', type='datetime', length=50),
)
def create_measurand_test_table(self):
self.conn.define_table(
self.measurandtab,
pydal.Field('Test_Run_History_Id', type='string', length=11),
pydal.Field('Case_Version', type='string', length=20),
pydal.Field('Case_Id', type='string', length=20),
pydal.Field('Uncertainty_Version', type='string', length=20),
pydal.Field('Uncertainty_Id', type='string', length=20),
pydal.Field('Measurand', type='string', length=20),
pydal.Field('Expected_Value', type='string', length=20),
pydal.Field('Allowed_Uncertainty', type='string', length=20),
pydal.Field('Lower_Threshold', type='string', length=20),
pydal.Field('Upper_Threshold', type='string', length=25),
pydal.Field('Min_Value', type='string', length=25),
pydal.Field('Max_Value', type='string', length=25),
pydal.Field('Average_Value', type='string', length=25),
pydal.Field('Actual_Uncertainty', type='string', length=20),
pydal.Field('Test_Result', type='string', length=254),
)
#endregion
#region store log
'''
上面建立了表,这里就是往表的column里面添加元素,这里用self.conn[整张表名].bulk_insert(),
'''
def store_measurand_log_entries(self, data):
try:
for row in data:
'''
这里data为一个以字典为元素的列表,row即为字典,
注意bulk_insert([{其中为key:value的模式,key即为上面建立的column,而value为row里面的value}])
'''
self.conn[self.measurandtab].bulk_insert([{
'Test_Run_History_Id': '{}'.format(row['Test_Run_History_Id']),
'Case_Version': '{}'.format(row['Case_Version']),
'Case_Id': '{}'.format(row['Case_Id']),
'Uncertainty_Version': '{}'.format(row['Uncertainty_Version']),
'Uncertainty_Id': '{}'.format(row['Uncertainty_Id']),
'Measurand': '{}'.format(row['Measurand']),
'Expected_Value': '{}'.format(row['Expected_Value']),
'Allowed_Uncertainty': '{}'.format(row['Allowed_Uncertainty']),
'Lower_Threshold': '{}'.format(row['Lower_Threshold']),
'Upper_Threshold': '{}'.format(row['Upper_Threshold']),
'Min_Value': '{}'.format(row['Min_Value']),
'Max_Value': '{}'.format(row['Max_Value']),
'Average_Value': '{}'.format(row['Average_Value']),
'Actual_Uncertainty': '{}'.format(row['Actual_Uncertainty']),
'Test_Result': '{}'.format(row['Test_Result']),
}])
self.conn.commit()
'''
insert完数值后要commit提交,因为数据库本身的原因insert, truncate, delete, and update都要commit才能真正提交,而create和drop则可以即时执行。
'''
except Exception as e:
'''
接收错误赋给e
'''
self.conn.rollback()
'''
rollback的作用是回退到上个commit,上个commit之后的操作全部忽略。
这里放在except后,就是当前面commit出错后,清除这些commit。
'''
raise Exception('Insert measurand result failed: {!s}'.format(e)) #抛出错误e
def store_device_information(self, data):
try:
self.conn[self.deviceinfotab].bulk_insert([{
'ip': '{}'.format(data['IP']),
'device_type': '{}'.format(data['DeviceType']),
'serial_no': '{}'.format(data['SerialNo']),
'mlfb': '{}'.format(data['MLFB']),
'mac': '{}'.format(data['MAC']),
'hw_version': '{}'.format(data['HWVersion']),
'sd_card_vendor': '{}'.format(data['SDCardVendor']),
}])
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise Exception('Insert device information failed: {!s}'.format(e))
def store_test_run_history(self, data):
try:
for row in data:
self.conn[self.test_run_history].bulk_insert([{
'device_information_id': '{}'.format(row['device_information_id']),
'firmware_version': '{}'.format(row['FirmwareVersion']),
'bootloader_version': '{}'.format(row['BootloaderVersion']),
'parameter_set_version': '{}'.format(row['ParameterSetVersion']),
'firmware_package_version': '{}'.format(row['FirmwarePackageVersion']),
'test_start_time': '{}'.format(row['test_start_time']),
'signal_equipment': '{}'.format(row['signalEquipment']),
'signal_id': '{}'.format(row['signalId']),
'network_type': '{}'.format(row['networkType']),
'u_din': '{}'.format(row['Udin']),
'i_din': '{}'.format(row['Idin']),
'f': '{}'.format(row['frequency']),
'pt': '{}'.format(row['pt']),
'ct': '{}'.format(row['ct']),
'case_version': '{}'.format(row['caseVersion']),
'uncertainty_version': '{}'.format(row['uncertaintyVersion']),
}])
self.conn.commit()
test_run_history_id = self.get_latest_test_run_history_id_from_table()
except Exception as e:
self.conn.rollback()
raise Exception('Insert device information failed: {!s}'.format(e))
return test_run_history_id
#endregion
'''
如果可能的话,最好在表中包含一个id类型的字段作为主键
(如果不指定这样的字段,DAL会自动包含一个名为“id”的字段)。
上面建立表后,表中会有一个自动生成的column,这个column为主键id,这个id的值是自动递增的。
'''
'''
如下函数是通过select进行筛选,它是用于选择表行,
用法是self.conn(定出筛选范围,相当于初步筛选).select(进一步筛选出表行),
在下面的初步筛选中用到了self.conn[self.deviceinfotab]['ip'] == data['IP'],这是在self.deviceinfotab表中找到'ip'的那一列,
然后这一栏中找到与输入的IP(入参)相等的那些行,
并且后面加了很多&,是一条一条逐列比对,直到找到所有符合这些特征的行;
最后将得到一个由字典作为元素的列表,然后用[0]索引出这个字典。
做这些的目的就是取出这行元素后,再从其中找到主键id的值,便于其他表格引用。
'''
def get_device_id_from_device_info_table(self, data):
deviceInfo = self.conn((self.conn[self.deviceinfotab]['ip'] == data['IP'])
& (self.conn[self.deviceinfotab]['device_type'] == data['DeviceType'])
& (self.conn[self.deviceinfotab]['serial_no'] == data['SerialNo'])
& (self.conn[self.deviceinfotab]['mlfb'] == data['MLFB'])
& (self.conn[self.deviceinfotab]['mac'] == data['MAC'])
& (self.conn[self.deviceinfotab]['hw_version'] == data['HWVersion']) & (self.conn[self.deviceinfotab]['sd_card_vendor']
== data['SDCardVendor'])).select(self.conn[self.deviceinfotab].ALL)[0]
return deviceInfo["id"]
'''
下面的函数首先初步筛选出表格。
然后用select,其中第一个入参是全选整张表格,然后orderby是排序,加~是倒序排列,
因为id是自动递增的,所以倒序排列后取第一行就可以得到最新的一个id。
limitby是限制出行(0,1)就是指第一行。
第一行被取出后,也是column做key,value则为第一行对应的值,组成一个字典;
这个字典成为列表的元素,而列表则作为整个self.conn().select()函数调用过程得到的结果。
然后取列表的第一个元素[0]就可以取出最新的一行,赋值给run,然后run["id"]取出最新的一行的id。
'''
def get_latest_test_run_history_id_from_table(self):
run = self.conn(self.conn[self.test_run_history]).select(self.conn[self.test_run_history].ALL, orderby=~self.conn[self.test_run_history].id, limitby=(0, 1))[0]
'''
这里将最新的一条记录从数据库取出,取出的这一列相当于是一个字典,
这个字典是以标题column为key,具体存放数据为value,然后这里的取法是先用self.conn[self.test_run_history])定出是哪一张表格,然后再在这张表格里面用select操作。
'''
return run["id"]
'''
下面一个函数应该是最实用的一个函数,self.conn.executesql(数据库命令),
这里调用executesql,可以直接识别数据库的命令,那样只要知道数据库命令,调用这个函数就相当于对数据库直接做操作。
我在下面的数据库命令中主要是判断在整张表中是否有符合我入参的行,
而且注意使用format,因为如果把data['IP']等写到用单引号引用的数据库命令中,会导致Python无法识别,数据库更加无法识别。
而且注意,表格的名字直接用device_information,而不是self.conn[self.deviceinfotab],因为数据库也根本不能识别这个函数模式。
'''
def validate_device_information_exists(self, data):
sql_order = 'SELECT * FROM device_information WHERE ip="{0}" AND device_type="{1}" AND serial_no="{2}" AND mlfb="{3}" AND mac="{4}" AND hw_version="{5}" AND sd_card_vendor="{6}"'.format(data['IP'], data['DeviceType'], data['SerialNo'], data['MLFB'], data['MAC'], data['HWVersion'], data['SDCardVendor'])
Info = self.conn.executesql(sql_order)
if Info:
return True
else:
return False
总结:在Python中,做到一件逻辑复杂的事情可以将这个事情分成一个个小块,然后复杂的逻辑用if,else等实现即可
class _ComprehensiveKeywords(_Context):
def __init__(self, *args):
super().__init__(*args)
self._device = _DeviceKeywords(*args)
self._seleniumext = _SeleniumExtKeywords(*args)
self._logging = _LoggingKeywords(*args)
self._common = _CommonKeywords(*args)
self._recording = _RecordingKeywords(*args)
self._configuration = _ConfigurationKeywords(*args)
self._db = None
#region store general information
def get_sd_card_vendor(self):
self._sel.go_to(self._device.get_device_debug_address('/sdcardstatistic'))
self._sel.wait_until_page_contains_element('css:a[href=sdcardstatistic2]')
statsText = self._sel.find_element('tag:pre').text
sdCardVendor = re.search(r"^\s*sd card vendor:\s*(.*?)\s*$", statsText, re.I | re.M)[1]
if sdCardVendor:
return sdCardVendor
else:
raise ValueError('Can\'t get SD card type.')
def get_generalinfo_list(self, ipAddress, getSDCardVendor=True):
ipAddress = str(ipAddress)
getSDCardVendor = robottypes.is_truthy(getSDCardVendor)
self._sel.go_to(self._device.get_device_address('/InfoDeviceInfo.html'))
self._sel.wait_until_page_contains_element('id:footer', timeout=None, error='Device information page not found')
devinfoDic = {}
tblRows = self._sel.find_elements('css:#workingArea > table > tbody > tr')
regex = re.compile(r'[ ()]')
for row in tblRows: # type: WebElement
cells = self._element_finder.find('tag:td', first_only=False, required=False, parent=row)
if cells:
keys = regex.sub('', cells[0].text.lower())
devinfoDic[keys] = cells[1].text
if _DeviceKeywords.active_device().deviceType in [SupportedDevice.Q200]:
if getSDCardVendor:
sdCardVendor = self.get_sd_card_vendor()
else:
sdCardVendor = 'NULL'
else:
sdCardVendor = 'NULL'
if _DeviceKeywords.active_device().deviceType in [SupportedDevice.Q200, SupportedDevice.Q100, SupportedDevice.P855, SupportedDevice.P850]:
devinfoDic['bootloaderversion'] = 'NULL'
if '?' in devinfoDic['localtime']:
devinfoDic['localtime'] = devinfoDic['localtime'].replace('?', ':')
devinfoDic['localtime'] = datetime.strptime(devinfoDic['localtime'][0:19], '%Y-%m-%d %H:%M:%S')
generalInfoDic = {
'IP': ipAddress,
'DeviceType': devinfoDic['devicetype'],
'SerialNo': devinfoDic['serialnumber'],
'MLFB': devinfoDic['ordernumbermlfb'],
'MAC': devinfoDic['macaddress'],
'HWVersion': 'NULL',
'SDCardVendor': str(sdCardVendor),
'FirmwareVersion': devinfoDic['firmwareversion'],
'BootloaderVersion': devinfoDic['bootloaderversion'],
'ParameterSetVersion': devinfoDic['parametersetversion'],
'FirmwarePackageVersion': devinfoDic['firmwarepackageversion'],
'DeviceLocalTime': devinfoDic['localtime'],
}
logger.info('The device general info is {}'.format(generalInfoDic))
return generalInfoDic
def store_device_information_into_database(self, username, password, host, dbname):
generalInfoDic = self.get_generalinfo_list(_DeviceKeywords.active_device().ipAddress)
self._db = PQDatabase(username, password, host, dbname)
try:
self._db.create_device_info_table()
if self._db.validate_device_information_exists(generalInfoDic) is False:
self._db.store_device_information(generalInfoDic)
device_id = self._db.get_device_id_from_device_info_table(generalInfoDic)
return device_id
finally:
self._db.close_database_connection()
def store_test_run_information_into_database(self, username, password, host, dbname, caseVersion, uncertaintyVersion):
self.test_run_history_id = None
generalInfoDic = self.get_generalinfo_list(_DeviceKeywords.active_device().ipAddress)
device_id = self.store_device_information_into_database(username, password, host, dbname)
configuration = self._configuration.get_ac_measurement_configuration()
pt = '{}:{}'.format(configuration.primRatedVoltage, configuration.secRatedVoltage)
ct = '{}:{}'.format(configuration.primRatedCurrent, configuration.secRatedCurrent)
if configuration.networkType == ACMNetworkType.SINGLE_PHASE:
renamed_networkType = "1P2W"
elif configuration.networkType == ACMNetworkType.THREE_WIRE_THREE_PHASE_BALANCED:
renamed_networkType = "3P3WB"
elif configuration.networkType == ACMNetworkType.THREE_WIRE_THREE_PHASE_UNBALANCED_2L:
renamed_networkType = "3P3W2I"
elif configuration.networkType == ACMNetworkType.THREE_WIRE_THREE_PHASE_UNBALANCED_3L:
renamed_networkType = "3P3W3I"
elif configuration.networkType == ACMNetworkType.FOUR_WIRE_THREE_PHASE_BALANCED:
renamed_networkType = "3P4WB"
elif configuration.networkType == ACMNetworkType.FOUR_WIRE_THREE_PHASE_UNBALANCED:
renamed_networkType = "3P4WU"
extended = {
'signalEquipment': BuiltIn().get_variable_value('${SignalModel}'),
'signalId': BuiltIn().get_variable_value('${OmicronSerial}'),
'frequency': configuration.frequency.name,
'networkType': renamed_networkType,
'Udin': configuration.primNomVoltage,
'Idin': configuration.primRatedCurrent,
'pt': pt,
'ct': ct,
'device_information_id': device_id,
'test_start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'caseVersion': caseVersion,
'uncertaintyVersion': uncertaintyVersion,
}
info = [{**generalInfoDic, **extended}]
self._db = PQDatabase(username, password, host, dbname)
try:
self._db.create_test_run_history_table()
self.test_run_history_id = self._db.store_test_run_history(info)
finally:
self._db.close_database_connection()
#endregion
#region store longterm test log
#region store measurement test log
def create_measurand_test_log(self, caseVersion, caseId, uncertaintyVersion, uncertaintyId, measurand, allowedUncertainty, expectedValue=None, measuredValues=None, actualUncertainty=None, errorOccured=None):
caseVersion = str(caseVersion)
caseId = str(caseId)
uncertaintyVersion = str(uncertaintyVersion)
uncertaintyId = str(uncertaintyId)
measurand = str(measurand)
allowedUncertainty = float(allowedUncertainty)
if expectedValue is not None: # Used for IEC62586/IEC61557 basic measurand and power test
expectedValue = round(float(expectedValue), 4)
lower_threshold = round((expectedValue - allowedUncertainty), 4)
upper_threshold = round((expectedValue + allowedUncertainty), 4)
else:
expectedValue = 'NULL'
lower_threshold = 'NULL'
upper_threshold = 'NULL'
if measuredValues is not None: # Used for IEC62586/IEC61557 basic measurand and power test
measuredValues = [*map(float, measuredValues)]
measuredValues = list(measuredValues)
measuredValues = sorted(measuredValues)
logger.info("The measured values are {}".format(measuredValues))
if np.isinf(measuredValues).any():
minValue = 999999 #999999 is inf overflow, due to database can't handel overflow and invalid value
maxValue = 999999
averageValue = 999999
elif np.isnan(measuredValues).any():
minValue = 666666 #666666 is nan invalid
maxValue = 666666
averageValue = 666666
else:
logger.info("into")
minValue = round(measuredValues[0], 4)
maxValue = round(measuredValues[-1], 4)
sumValue = 0
for value in measuredValues:
sumValue = sumValue + value
averageValue = round(sumValue/len(measuredValues), 4)
logger.info("max value is {}, min value is {}, average value is {}".format(maxValue, minValue, averageValue))
else:
minValue = 'NULL'
maxValue = 'NULL'
averageValue = 'NULL'
if actualUncertainty is not None: #Used for IEC61557/IEC62053 energy test
actualUncertainty = float(actualUncertainty)
else:
actualUncertainty = 'NULL'
if errorOccured is False:
testResult = 'Pass'
else:
testResult = 'Failed'
measurand_log = {
'Case_Version': caseVersion,
'Case_Id': caseId,
'Uncertainty_Version': uncertaintyVersion,
'Uncertainty_Id': uncertaintyId,
'Measurand': measurand,
'Expected_Value': expectedValue,
'Allowed_Uncertainty': allowedUncertainty,
'Lower_Threshold': lower_threshold,
'Upper_Threshold': upper_threshold,
'Min_Value': minValue,
'Max_Value': maxValue,
'Average_Value': averageValue,
'Actual_Uncertainty': actualUncertainty,
'Test_Result': testResult,
}
return measurand_log
def store_measurand_test_result_into_database(self, username, password, host, dbname, dataCluster):
runId = self.test_run_history_id
infoList = []
generalInfoDic = {
'Test_Run_History_Id': str(runId),
}
for data in dataCluster:
info = {**generalInfoDic, **data}
infoList.append(info)
self._db = PQDatabase(username, password, host, dbname)
try:
self._db.create_measurand_test_table()
self._db.store_measurand_log_entries(infoList)
finally:
self._db.close_database_connection()
#endregion