知识图谱嵌入理论介绍: 简要总结一篇关于嵌入知识图谱的综述
KGE的诸多方法
KGE就是
现有的KGE方法可分为三类:
- 基于语义匹配(semantic matching based)
- 基于神经网络(neural network based)
,
TransE
该模型将关系视为从头到尾实体的翻译。 。
由于关系和实体都被表示为向量,另一种数学说法是向量(vector)空间中,,即: v h + v r ≈ v t v_h+v_r\approx v_t vh+vr≈vt
所以我们称之为
TransE的得分函数就是向量之间的欧氏距离的相反数: S c o r e ( h , r , t ) = − ∣ ∣ v h + v r − v t ∣ ∣ 2 2 Score(h,r,t)=-||v_h+v_r-v_t||_2^2 Score(h,r,t)=−∣∣vh+vr−vt∣∣22 损失函数定义为: m a x ( 0 , γ − S c o r e ( h , r , t ) + S c o r e ( h ′ , r , t ′ ) ) max(0,\gamma-Score(h,r,t)+Score(h',r,t')) max(0,γ−Score(h,r,t)+Score(h′,r,t′)) 也就是在embedding space中,正例三元组的得分要比负例三元组的得分高出 γ \gamma γ,又由于得分函数表示为距离的相反数,所以得分高代表距离近。即:正例三元组的距离要比负例三元组的距离小至少 γ \gamma γ长度的距离。
RESCAL
该模型的 S c o r e ( h , r , t ) = v h T M r v t Score(h,r,t)=v_h^TM_rv_t Score(h,r,t)=vhTMrvt
其中 v h ∈ R d , v t ∈ R d , M r ∈ R d × d v_h\in R^d,v_t\in R^d,M_r\in R^{d\times d} vh∈Rd,vt∈Rd,Mr∈Rd×d v h , v t v_h,v_t vh,vt都是从实体embedding矩阵(记为 E E E)中的(根据实体id获取)的vector, E E E的形状是(num_entities,d)。 M r M_r Mr是整个关系tensor(三维的)中的根据关系id获取的matrix(二维的)。整个关系tensor记为 R R R,形状是(num_relations,d,d),所以 M r M_r Mr的shape是( d , d d,d d,d)。
DistMult
DistMult是RESCAL的简化。具体来说就是RESCAL中每一个head和tail实体之间的关系r是用一个matrix表示。而DistMult中则用一个vector表示两个实体间的关系。 所以得分函数是三个vector之间的内积: < v h , v r , v t > <v_h,v_r,v_t> <vh,vr,vt> v h , v r , v t ∈ R d v_h,v_r,v_t\in R^d vh,vr,vt∈Rd
代码实现
获取数据
我们使用FB15K知识库
下载解压: FB15K知识库就是TransE这篇论文的作者从Freebase知识库中选取的一部分三元组构成的一个小规模的知识库
三元组数量 | 实体数量 | 关系数量 |
---|---|---|
592213 | 14951 | 1345 |
592213个三元组的划分情况是:
数据集划分 | 三元组数量 |
---|---|
训练集 | 483142 |
验证集 | 50000 |
测试集 | 59017 |
实现
实现代码需要说明的一点是,下面的代码采用Binary Cross Entropy loss作为损失函数,即,经过模型之后,输出一个向量,长度是num_entities,也就是所有实体的数量。
标签是尾实体的id,即告诉模型第几个位置的实体才是真正的尾实体,需要增加这个位置的概率,降低其余位置的概率。
导包
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import time
from collections import defaultdict
import argparse
from tqdm import tqdm
import os
处理数据
加载数据
def load_data(data_dir,data_type):
with open("%s%s.txt" % (data_dir, data_type), "r") as f:
data = f.read().strip().split("\n")
data = [i.split('\t') for i in data]
print(len(data),data_type)
return data
train_data=load_data(data_dir='/mnt/cfs/speech/nlp/work/xhsun/KGQA/Trans/FB15k/',data_type='freebase_mtr100_mte100-train')
valid_data=load_data(data_dir='/mnt/cfs/speech/nlp/work/xhsun/KGQA/Trans/FB15k/',data_type='freebase_mtr100_mte100-valid')
test_data=load_data(data_dir='/mnt/cfs/speech/nlp/work/xhsun/KGQA/Trans/FB15k/',data_type='freebase_mtr100_mte100-test')
data=train_data+valid_data+test_data
print(len(data))
统计所有的头实体、尾实体以及 关系:
entities = sorted(list(set([d[0] for d in data]+[d[2] for d in data])))
print(len(entities))
relations = sorted(list(set([d[1] for d in data])))
print(len(relations))
即14951个实体,1345个关系
构造entity2id和relation2id的字典映射
entity_idxs={
entities[i]:i for i in range(len(entities))}
relation_idxs={
relations[i]:i for i in range(len(relations))}
构造实体到id,关系到id的映射,这一步是NLP中必做的一步,因为我们要根据输入的实体,找到对应的id,进而找到对应的embedding
生成训练数据
train_data_idxs=[[entity_idxs[triplet[0]],relation_idxs[triplet[1]],entity_idxs[triplet[2]]] for triplet in train_data]
生成批次的数据输入
er_vocab=defaultdict(list)
for triplet in train_data_idxs:
er_vocab[(triplet[0],triplet[1])].append(triplet[2])
er_vocab_pairs=list(er_vocab.keys())
batch_inputs=er_vocab_pairs[:4]
batch_targets=torch.zeros([len(batch_inputs),len(entity_idxs)],dtype=torch.float32)
for i,pair in enumerate(batch_inputs):
batch_targets[i,er_vocab[pair]]=1
batch_inputs=np.array(batch_inputs)
以前4个三元组为例
。
以第一个三元组为例:
即,(3920,791,9220)是一个三元组,也就是输入数据的一个样本。(3920,791,3799)也是一个三元组。所以标签有两个1。 了解了输入输出,接下来就可以定义模型。
模型
class KGE(nn.Module):
def __init__(self,model_name,ent_vec_dim,num_entities,num_relations):
''' num_entities是所有实体的数量 num_relations是所有关系的数量 ent_vec_dim是每一个实体向量的维度 如果model_name是RESCAL,那么每一个关系用一个矩阵matrix表示,shape==(ent_vec_dim,ent_vec_dim) '''
super(KGE,self).__init__()
self.E=nn.Embedding(num_embeddings=num_entities,embedding_dim=ent_vec_dim,padding_idx=0)
self.model_name=model_name
self.ent_vec_dim=ent_vec_dim
self.num_entities=num_entities
if self.model_name=='RESCAL':
self.R=nn.Embedding(num_embeddings=num_relations,embedding_dim=ent_vec_dim*ent_vec_dim,padding_idx=0)
self.scoreFun=self.RESCAL
else:
self.R=nn.Embedding(num_embeddings=num_relations,embedding_dim=ent_vec_dim,padding_idx=0)
self.scoreFun=self.DistMult
def RESCAL(self,head_embed,rel_embed):
''' RESCAL模型将每一个关系用一个matrix表示。 输入: head_embed.size()==(batch_size,self.ent_vec_dim) rel_embed.size()==(batch_size,self.ent_vec_dim*2) 输出: score.size()==(batch_size,self.num_entities) '''
batch_size=head_embed.size(0)
head_embed=head_embed.view(batch_size,1,self.ent_vec_dim)
rel_embed=rel_embed.view(batch_size,self.ent_vec_dim,self.ent_vec_dim)
score=torch.mm(torch.squeeze(torch.bmm(head_embed,rel_embed),dim=1),self.E.weight.transpose(1,0))
return score
def DistMult(self,head_embed,rel_embed):
''' DistMult是RESCAL的简化版,将每一个关系用一个vector表示。 输入: head_embed.size()==(batch_size,self.ent_vec_dim) rel_embed.size()==(batch_size,self.ent_vec_dim) 输出: score.size()==(batch_size,self.num_entities) '''
score=torch.mm(head_embed*rel_embed,self.E.weight.transpose(1,0))
return score
def forward(self,head_idx,rel_idx):
''' 输入: head_idx.size()==rel_idx.size()==(batch_size,) 输出: probabilities.size()==(batch_size,self.num_entities) 即:预测每一个实体可以作为尾实体的概率 '''
batch_size=head_idx.size(0)
score=self.scoreFun(head_embed=self.E(head_idx),rel_embed=self.R(rel_idx))
assert score.size()==(batch_size,self.num_entities)
probabilities=torch.sigmoid(score)
return probabilities
前向传播
head_idx=torch.LongTensor(batch_inputs[:,0])
rel_idx=torch.LongTensor(batch_inputs[:,1])
RESCAL=KGE(model_name='RESCAL',ent_vec_dim=200,num_entities=len(entity_idxs),num_relations=len(relation_idxs))
DistMult=KGE(model_name='DistMult',ent_vec_dim=200,num_entities=len(entity_idxs),num_relations=len(relation_idxs))
probabilities1=RESCAL(head_idx,rel_idx)
probabilities2=DistMult(head_idx,rel_idx)