前言
最近做的一个实现。将ASTER光谱库中所有地物连续波谱响应的指定地物大类转换为指定卫星的通道比辐射率。最终形式是pd.Dataframe,列索引为地物名称,值为通道比辐射率。 根据文件名进行筛选和使用copy逐行阅读文件txt文本数据,插值,已知序列为积分。
代码
import os import numpy as np from shutil import * import matplotlib.pyplot as plt from scipy import integrate import pandas as pd #获取指定路径下所有文件的绝对路径 def listDir(path, list_name): """ :param path: 路径 :param list_name: path所有文件的绝对路径名列表 :return: """ for file in os.listdir(path): file_path = os.path.join(path, file) if os.path.isdir(file_path): listDir(file_path, list_name) else: list_name.append(file_path) #读取MERSI2光谱响应函数 #光谱库单位微米, μm 间隔0.002 , 响应为百分比; 卫星单位波数 μm = 10**4 / 波数 然后插值为0.01μm间隔 def readMERSI2_response(fileName, waveLength_MERSI2, response_MERSI2): with open(fileName, 'r') as f: line = f.readline() while line: try: wl, re = line.split() waveLength_MERSI2.append(float(wl)) response_MERSI2.append(float(re)) except: pass line = f.readline() origin_wl = 10**4/np.array(waveLength_MERSI2)[::-1] origin_re = np.array(response_MERSI2)[::-1] interp_wl = np.linspace(origin_wl.min(), origin_wl.max(), (origin_wl.max() - origin_wl.min())*50, endpoint=True) interp_re = np.interp(interp_wl, origin_wl, origin_re) MERSI2_response = np.vstack((interp_wl, interp_re))#输出插值为0.02μm间隔 MERSI2_response = np.vstack((origin_wl, origin_re))#输出原始为0.01μm间隔 return (MERSI2_response) #光谱库筛选植物红外光谱 def integralSpectrum(sourceDir, targetDir, keyword1, keyword2, keyword3, MERSI2_response24Dir, MERSI2_response25Dir): sourceNameList = os.listdir(sourceDir) #print(fileNameList) #vegetation and tir and spectrum for fileName in sourceNameList: if keyword1 in fileName and keyword2 in fileName and keyword3 in fileName: targetNameList = os.listdir(targetDir) if fileName in targetNameList: pass else: copy(os.path.join(sourceDir, fileName), targetDir) #光谱库单位微米, μm 间隔0.002 , 响应为百分数; 卫星单位波数 μm = 10**4 / 波数 然后插值为0.01μm间隔 waveLength_MERSI2 = [] response_MERSI2 = [] MERSI2_response_24 = readMERSI2_response(MERSI2_response24Dir, waveLength_MERSI2, response_MERSI2) waveLength_MERSI2 = [] response_MERSI2 = [] MERSI2_response_25 = readMERSI2_response(MERSI2_response25Dir, waveLength_MERSI2, response_MERSI2) #提取每条植物光谱响应 filePathNameList = [] listDir(targetDir, filePathNameList) nameList = [] waveLength = [] response = [] veg_responseList24 = [] vegNameList = filePathNameList for filePathName in vegNameList: if os.path.splitext(filePathName)[1] == '.txt': with open(filePathName, 'r') as f: try: line = f.readline() other, name = line.split(' ', 1) nameList.append(name) while line: #print(line) try: wl, re = line.split() waveLength.append(float(wl)) response.append(float(re)) except: pass line = f.readline() responseList = np.interp(MERSI2_response_24[0,:], np.array(waveLength), np.array(response)) veg_responseList24.append(responseList) waveLength = [] response = [] except: pass filePathNameList = [] listDir(targetDir, filePathNameList) nameList = [] waveLength = [] response = [] veg_responseList25 = [] vegNameList = filePathNameList for filePathName in vegNameList: if os.path.splitext(filePathName)[1] == '.txt': with open(filePathName, 'r') as f: try: line = f.readline() other, name = line.split(' ', 1) nameList.append(name) while line: #print(line) try: wl, re = line.split() waveLength.append(float(wl)) response.append(float(re)) except: pass line = f.readline() responseList = np.interp(MERSI2_response_25[0,:], np.array(waveLength), np.array(response)) veg_responseList25.append(responseList) waveLength = [] response = [] except: pass #所有植物光谱和MERSI2光谱响应进行积分成比辐射率 veg_b_24List = [] for response in veg_responseList24: #integrate.trapz(y, x) 前面放y值, 后面放x值 veg_b_up = integrate.trapz(MERSI2_response_24[1] * ((100-response)/100), MERSI2_response_24[0]) veg_b_down = integrate.trapz(MERSI2_response_24[1], MERSI2_response_24[0]) veg_b = veg_b_up / veg_b_down veg_b_24List.append(veg_b) veg_b_25List = [] for response in veg_responseList25: #integrate.trapz(y, x) 前面放y值, 后面放x值 veg_b_up = integrate.trapz(MERSI2_response_25[1] * ((100-response)/100), MERSI2_response_25[0]) veg_b_down = integrate.trapz(MERSI2_response_25[1], MERSI2_response_25[0]) veg_b = veg_b_up / veg_b_down veg_b_25List.append(veg_b) ''' Dict = DictList = [] for i in range(len(veg_b_24List)): Dict.append(nameList[i]) Dict.append(veg_b_24List[i]) DictList.append(Dict) Dict = [] veg_b_24Dict = dict(DictList) #解决无法直接dict列表变量方法即加一个[vaiable]即可 ''' dfName = pd.DataFrame(nameList, columns=['name']) df24 = pd.DataFrame(data=veg_b_24List, columns=['b24']) df25 = pd.DataFrame(data=veg_b_25List, columns=['b25']) df24 = dfName.join(df24) df = df24.join(df25) df_mean = df.groupby('name').mean() return df_mean if __name__ =='__main__': sourceDir = targetDir = MERSI2_response25Dir = keyword1 = 'vegetation' keyword2 = 'tir' keyword3 = 'spectrum' a = integralSpectrum(sourceDir, targetDir, keyword1, keyword2, keyword3, MERSI2_response24Dir, MERSI2_response25Dir)
import ASTER_MERSI2_response
import numpy as np
sourceDir = r'E:\ASTER光谱库'
targetDir = r'E:\ASTER光谱库\ASTER\Man-made'
MERSI2_response24Dir = r'E:\FY光谱响应\MERSI2_SRF\rtcoef_fy3_4_mersi2_srf_ch24.txt'
MERSI2_response25Dir = r'E:\FY光谱响应\MERSI2_SRF\rtcoef_fy3_4_mersi2_srf_ch25.txt'
keyword1 = 'manmade'
keyword2 = 'all'
keyword3 = 'spectrum'
result = ASTER_MERSI2_response.integralSpectrum(sourceDir, targetDir,
keyword1, keyword2,
keyword3, MERSI2_response24Dir,MERSI2_response25Dir)