[源码解析] 深度学习分布式训练框架 horovod (18) — kubeflow tf-operator
文章目录
- [源码解析] 深度学习分布式训练框架 horovod (18) --- kubeflow tf-operator
-
- 0x00 摘要
- 0x01 背景知识
-
- 1.1 Kubernetes
- 1.2 容器作为调度单元
- 1.3 Kubeflow
- 1.4 Tensorflow on Kubeflow
- 1.5 Operator
- 1.6 TF-Operator
- 0x02 TensorFlow 分布式
-
- 2.1 Parameter server架构
- 2.2 Tensorflow PS-Worker
-
- 2.2.1 架构
- 2.2.2 代码
- 0x03 TF-Operator
-
- 3.1 TF-Operator 设计思路
- 3.2 架构图
-
- 3.2.1 什么是Pod
- 3.2.2 为什么要有 service
- 3.2.3 什么是 controller
- 3.3 Spec
- 3.4 TFJob
- 3.5 角色
-
- 3.5.1 定义
- 3.5.2 创建角色
- 3.5.3 如何区分 master
- 0x04 Contoller
-
- 4.1 K8S CRD关键概念
- 4.2 定义
- 4.3 入口
- 4.4 syncHandler
- 4.5 ReconcileJobs
- 4.6 处理 Pod
-
- 4.6.1 ReconcilePods
- 4.6.2 createNewPod
- 4.6.3 生成配置信息
-
- 4.6.3.1 SetClusterSpec
- 4.6.3.2 genTFConfigJSONStr
- 4.6.3.3 genClusterSpec
- 4.6.4 CreatePodsWithControllerRef
- 4.6.5 createPods
- 4.7 处理服务
-
- 4.7.1 ReconcileServices
- 4.7.2 CreateNewService
- 4.7.3 CreateServicesWithControllerRef
- 4.7.4 createServices
- 0x05 比较一般部署
-
- 5.1 运行
- 5.2 比较
-
- 5.2.1 普通 TF
- 5.2.2 TF-Operator
- 0x06 总结
- 0xEE 个人信息
- 0xFF 参考
0x00 摘要
Horovod 是一款基于 AllReduce 分布式训练框架 TensorFlow、PyTorch 支持主流深度学习框架,优化通信,Horovod 广泛应用于数据并行训练。
通过十几篇文章,我们一步步分析 Horovod 各个方面。下一步是面对 Horovod on K8S 这座大山。
本文及以下文章的目的是:通过分析学习 Horovod on K8S 功能,梳理相关概念,希望找出设计思路。所以成文方式是:整理学习了很多网上文章,然后自己分析代码。特此对各位作者深表感谢。
本文是 horovod on k8s 介绍相关概念和必要前提kubeflow 社区的 tf-operator。
本系列其他文章链接如下:
[\源码解析] 深度学习分布式训练框架 Horovod — (1) 基础知识
[\源码解析] 深度学习分布式训练框架 horovod (2) — 从用户的角度切入
[\源码解析] 深度学习分布式训练框架 horovod (3) — Horovodrun背后做了什么
[\源码解析] 深度学习分布式训练框架 horovod (4) — 网络基础 & Driver
[\源码解析] 深度学习分布式训练框架 horovod (5) — 融合框架
[\源码解析] 深度学习分布式训练框架 horovod (6) — 后台线程架构
[\源码解析] 深度学习分布式训练框架 horovod (7) — DistributedOptimizer
[源码解析] 深度学习分布式训练框架 horovod (8) — on spark
[源码解析] 深度学习分布式训练框架 horovod (9) — 启动 on spark
[源码解析] 深度学习分布式训练框架 horovod (10) — run on spark
[源码解析] 深度学习分布式训练框架 horovod (11) — on spark — GLOO 方案
[源码解析] 深度学习分布式训练框架 horovod (12) — 弹性训练的整体结构
[源码解析] 深度学习分布式训练框架 horovod (13) — 弹性训练之 Driver
[源码解析] 深度学习分布式训练框架 horovod (14) — 如何发现节点挂了?
[源码解析] 深度学习分布式训练框架 horovod (15) — 广播 & 通知
[源码解析] 深度学习分布式训练框架 horovod (16) — 弹性训练之Worker生命周期
[源码解析] 深度学习分布式训练框架 horovod (17) — 弹性训练容错
0x01 背景知识
1.1 Kubernetes
kubernetes,简称K8s,用8代替8个字符ubernete而缩写。它是一个开源,用于管理云平台多个主机上的容器化应用,Kubernetes目标是使部署容器化的应用简单高效(powerful),Kubernetes一种应用部署、规划、更新、维护的机制。
由于它提供了通过容器使用不同机器学习框架的灵活性和按需扩展的敏捷性,因此深度神经网络训练选项越来越流行。
面对复杂的模型训练或大量的数据,单机的计算能力往往不能满足计算能力的要求。使用阿里巴巴 AiACC 或者社区的 horovod 等分布式训练框架可以将单机训练任务扩展到支持分布式训练任务,只需修改几行代码。
在 Kubernetes 上常见的是 kubeflow 社区的 tf-operator 支持 Tensorflow PS 模式,或者 mpi-operator 支持 horovod 的 mpi allreduce 模式。
1.2 容器作为调度单元
为什么希望使用容器来作深度学习系统的调度单元?因为容器拉取/启动快速。隔离资源效果好。抽象来看,可以将容器的image作为job的一部分分发调度执行。当然容器化后会引入gpu,网络等性能的代价。
比如 nvidia gpu 对docker提供了支持,nvidia-docker可以代替docker执行create和run操作。下图就是nvidia-docker架构。
1.3 Kubeflow
是一个开源的 Kubernetes 原生平台,用于开发、编排、部署和运行可扩展的便携式机器学习工作负载。Kubeflow 可以在任何Kubernetes 集群上运行。
Kubeflow可以很好的管理多机任务,Kubeflow的名字比较简单,为Kubernetes + TensorFlow,是一个机器学习工具包,是运行在K8s之上的一套技术栈,这套技术栈包含了很多组件,组件之间的关系比较松散,我们可以配合起来用,也可以单独用其中的一部分。
Kubeflow 询问 Kubernetes 计划分配哪几台机器来运行一个分布式作业中的各个进程,随后告 知每个进程,所有其他进程的 IP 地址和 port。从而保证一个作业里各个进程 之间互相知道对方。
为什么需要让所有进程互相知道对方呢?这是 TensorFlow ps-based distribution 方式要求的。TensorFlow 1.x 原生的分布 式训练功能让一个作业中所有进程都执行 TensorFlow 1.x runtime 程序。这些 进程互相通信,互相协调成为一个“分布式 runtime“,来解释执行表示深度学习 计算过程的计算图(graph)。在开始分布式训练之初,graph 被 TensorFlow runtime 拆解成若干子图;每个进程负责执行一个子图 —— 任何一个进程失败 (可能是被更高优先级作业抢占),则整个大图的执行就失败了。所以 TensorFlow 原生的分布式训练能力不是容错的(fault-tolerant)。不过, 它是可以从错误恢复(fault-recoverable)—— TensorFlow API 提供 checkpoint 的能力;如果一个作业失败了,可以重启作业,从最近的 checkpoint 开始继续执行。
1.4 Tensorflow on Kubeflow
Kubeflow 支持两种不同的 Tensorflow 框架分布式训练方法。
- 第一种是原生 Tensorflow 架构,它依赖于集中式参数服务器来实现工作线程之间的协调。
- 第二种是分散式方法,工作线程通过 MPI AllReduce 原语直接相互通信,不使用参数服务器。NVIDIA 的 NCCL 库已经在GPU 上有效地执行了大部分 MPI 原语,而 Uber 的Horovod 让使用 TensorFlow 执行多 GPU 和多节点训练变得轻而易举。与参数服务器相比,第二种方法可以更好地优化带宽和更好地扩展。
1.5 Operator
Operator 是Kubernetes 之中的概念,主要是用来打包、部署及管理用户的任务。
Operator可以简单理解为 CRD + Controller。
- CRD(Custom Resource Definition)是 Kubernetes 的扩展类型,用来为用户自定义资源提。
- Controller 用来让用户操作CRD。
如果用 Java 来比喻,operator 就是 Class,CRD 就是类的成员变量,Controller 就是类成员方法。
1.6 TF-Operator
虽然KubeFlow
提供了一大堆组件,涵盖了机器学习的方方面面,但模型训练肯定是KubeFlow
最重要的功能。 KubeFlow
针对各种各样的机器学习框架提供了训练的能力。方式是定义了各种各样的Operator
,其主要是用来管理机器学习或者深度学习里面的任务,比如如何管理维护一个任务的多个节点,如何管理Pod及任务的生命周期,如何进行容错等等。
TF-Operator
就是开源社区基于K8S
提供的扩展API
,提供了TensorFlow
的训练能力,从名字也能看出来,这个实现是类似Job
的一种方式,其特点如下:
- 提供TensorFlow原生PS-worker架构 的多机训练
- 推荐将PS和worker一起启动
- 通过service做服务发现
- 在社区中最早期的Operator
因为 TF-Operator 是社区中最早期的Operator,所以我们有必要先看看。
0x02 TensorFlow 分布式
因为 TF-Operator 是为了支持 Tensorflow PS 模式,所以我们首先介绍一下 TensorFlow 分布式。
2.1 Parameter server架构
在Parameter server架构(PS架构)中,集群中的节点被分为两类:参数服务器(parameter server)和工作服务器(worker)。其中参数服务器存放模型的参数,而工作服务器负责计算参数的梯度。在每个迭代过程,工作服务器从参数服务器中获得参数,然后将计算的梯度返回给参数服务器,参数服务器聚合从工作服务器传回的梯度,然后更新参数,并将新的参数广播给工作服务器。
PS-Worker 架构的梯度更新有着 和 两种方式:
在同步训练中, 所有的Worker设备采用同一个Batch的不同小批(mini-batch)数据来训练,等待所有设备该批次的梯度计算完成后,模型才会根据所有的梯度进行一次参数更新,然后PS将更新后的模型下发到各个设备。
异步训练中,没有设备需要去等待其他设备的梯度计算和参数更新,所有设备独立算并与将梯度结果更新到中心节点(PS)。异步训练总体会,但是异步训练的一个很严重的问题是,刚开始所有设备采用相同的参数来训练,但是异步情况下,某个设备完成一步训练后,可能发现模型参数已经被其它设备更新过了,此时这个设备计算出的梯度就过期了。
2.2 Tensorflow PS-Worker
2.2.1 架构
这里只是大致介绍一下,主要是为了和 TF-Operator 对比。
TF 把Job主要划分为Parameter Server和Worker(因为 TF 版本不同,所以有不同阶段的特别定义,比如 master 或者 chief)。
- Parameter Job:执行模型相关的作业,包括模型参数存储,分发,汇总,更新;作为分布式训练的服务端,等到各个终端(supervisors)来连接。
- Worker Job: 在TensorFlow的代码注释中被称为supervisors,执行训练相关的作业,包括推理计算和梯度计算。如果参数的数量太大,一台机器处理不了,这就要需要多个Tasks(动态上理解,主机上的一个进程,从静态的角度理解,
Task
就是我们写的代码)。 - Chief supervisors:在众多运算终端中必须选中一个作为主要的运算终端。该终端是在运算终端中最先启动的,它的功能是合并各个终端运算后的学习参数,将其保存再写入。
- Cluster 是 Jobs 的集合: Cluster(集群) 就是集群系统。
每个具体角色网络标识都是唯一的,即分布在不同IP的机器上(或者同一主机但不同端口号)。
在实际运行中,各个角色的网络构建部分代码必须完全相同,Ps-worker 架构分布式模型的流程大致如下:
-
pull : 各个worker根据数据流图的拓扑结构,从PS拉取最新的模型参数
-
feed: 各worker填充不同的批数据
-
compute: 各worker按照相同的模型参数和不同的批数据计算梯度,得出不同的梯度值
-
push 各worker 将计算得到的梯度值上传给PS
-
update: PS 收集所有worker的梯度值,求平均值,更新模型参数。
2.2.2 代码
具体逻辑如下:
Task
需要知道集群上都有哪些主机,以及它们都监听什么端口。tf.train.ClusterSpec()
就是用来描述这个。- 这个
Cluster
(集群)有两个Job
(worker.ps),worker
中有三个Task
(即,有三个Task
执行Tensorflow op
操作) - 将
ClusterSpec
当作参数传入到tf.train.Server()
中,同时指定此Task
的Job_name
和task_index
。 - 由于是相同的代码运行在不同的主机上,所以要传入
job_name
和task_index
加以区分,而ps_hosts
和worker_hosts
对于所有主机来说,都是一样的,用来描述集群的。 - 一个tf.train.Server包含了本地设备(GPUs,CPUs)的集合,可以连接到到其它task的ip:port(存储在cluster中), 还有一个session target用来执行分布操作。还有最重要的一点就是,它创建了一个服务器,监听port端口,如果有数据传过来,他就会在本地执行(启动session target,调用本地设备执行运算),然后结果返回给调用者。
- 为了使ps_server能够一直处于监听状态,我们需要使用server.join()。这时,进程就会block在这里.至于为什么ps_server刚创建就join呢,原因是因为下面的代码会将参数指定给ps_server保管,所以ps_server静静的监听就好了。
# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker
# jobs on hosts worker0, worker1 and worker2.
cluster_spec = {
"ps": ["ps0:2222", "ps1:2222"],
"worker": ["worker0:2222", "worker1:2222", "worker2:2222"]}
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({
"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
稍微完整点的代码如下:
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({
"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# 找出worker的主节点,即task_index为0的节点
is_chief = (FLAGS.task_index == 0)
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Compute
运行如下,:
# On ps0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=1
0x03 TF-Operator
3.1 TF-Operator 设计思路
了解了 TF 分布式的大致运作,我们来看看 TF-Operator 设计思路。
以下是从 “Design Doc TFJob K8s CRD” 中翻译的。
目标是使在Kubernetes(K8s)上运行TensorFlow训练(尤其是分布式训练)变得容易。我建议通过创建一个K8s自定义资源描述符(CRD)和关联的控制器来实现这一点。CRD负责管理运行培训作业所需的K8s资源。
Kubernetes通过提供一个流程(而不是以VM为中心)的世界视图,使得管理流程变得更加容易。Kubernetes还为复杂的分布式应用程序提供了基本的构建块。例如,K8s提供对DNS、健康检查、日志收集、度量收集、存储等的内置支持。
在K8s中,控制器负责确保一套Pods是运行状态。Pod是K8s中的基本构建块,它描述了一个或多个应该进行共定位的进程(相同的ip)。K8s配备了许多内置控制器。可以确保N个pod以特定的规范运行。作业控制器可以用来运行二进制文件。
内置控制器不足以运行分布式TensorFlow作业。TensorFlow是一个有状态的应用程序;每个参数服务器和工作者都需要具有唯一的可寻址性,以支持所有不同的分布式培训模式。K8s有一个statefulset。 但是,有状态集用于永久运行的有状态服务(如Redis之类的内存分片缓存服务),而不是用于运行到完成的作业。
因此,今天在K8s上运行分布式TF作业意味着从内置原语中拼凑出一个解决方案。通常,这意味着手动管理多个资源。例如,用户可以为参数服务器创建一个有状态集,为工作者创建一个有状态集,为主服务器创建一个作业。
为了解决内置资源的限制,K8s支持自定义资源(CRD)和控制器。使用CRD,可以很容易地为特定工作负载创建具有所需语义的控制器,同时将用户隐藏在实现中。K8s社区很快就采用了这种模式,贡献了大量的CRD用于各种工作负载。
开发crd和各种控制器的K8s团队的意见是,大多数控制器使用非分布式、多线程设计,可伸缩性不是问题。
TFJob CRD为K8s定义了TFJob资源。
TFJob资源是 TfReplicas 的集合。每个TfReplica对应一个在工作中扮演角色的一组 TensorFlow processes;
我做出了一个明确的决定,不试图隐藏或替换K8s抽象。例如,每个TfReplica都包含一个标准的K8s PodTemplate 以指定要在每个复制副本中运行的进程(包括TF)。我这样做是因为K8s已经提供了一个被广泛采用和理解的API。因此,引入新的概念来代替K8s的概念是令人困惑的。此外,公开PodTemplate 使TFJob用户可以轻松地利用K8s特性。例如,TFJob用户可以使用K8s将卷附加到其TF进程。这使得TF与K8s支持的任何存储系统(如PDs、NFS等)结合使用变得非常容易。
3.2 架构图
具体架构图如下:
3.2.1 什么是Pod
我们从图上来看,先看中间的 pod 概念。
pod 是 k8s调度的最小单元。pod 可以理解为:容器组,同时pod相当于逻辑主机,进入pod后仿佛进入一个linux主机,命令都可用(linux系统下),该“主机”内又有很多容器,进入后又仿佛是又进了一个linux主机。默认情况下,每个容器的文件系统与其他容器完全隔离。每个pod都有自己的ip地址。pod内的容器共享相同的ip和端口空间。
3.2.2 为什么要有 service
首先,每个Pod都会被分配一个单独的IP地址,而且每个Pod都提供了一个独立的Endpoint(Pod IP + ContainerPort)以被客户端访问,但这种访问仅限于集群内部,外部没法访问集群内部的IP地址,
其次,Pod的生命是有限的,如果Pod重启IP很有可能会发生变化。当 controller 用新 Pod 替代发生故障的 Pod 时,新 Pod 会分配到新的 IP 地址。这样就产生了一个问题:如果一组 Pod 对外提供服务(比如 HTTP),它们的 IP 很有可能发生变化,那么客户端如何找到并访问这个服务呢?
Kubernetes 给出的解决方案是 Service。
Service只是一个抽象概念,Kubernetes Service 从逻辑上代表了一组 Pod,具体是哪些 Pod 则是由 label 来挑选。Service 在逻辑上将一组pod(功能相同)给抽象出来一个统一入口。可以将他简单理解为做了一个服务的负载均衡。
Service 有自己 IP,而且这个 IP 是不变的。客户端只需要访问 Service 的 IP,Kubernetes 则负责建立和维护 Service 与 Pod 的映射关系。无论后端 Pod 如何变化,对客户端不会有任何影响,因为 Service 没有变。所以一般会通过service来访问pod。core-dns会给service分配一个内部的虚拟ip,因此内部服务可以通过这个ip或者是serviceName来访问到pod的服务。
我们给出一个源码中的service 例子。
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/scrape: "true"
prometheus.io/port: "8443"
labels:
app: tf-job-operator
name: tf-job-operator
spec:
ports:
- name: monitoring-port
port: 8443
targetPort: 8443
selector:
name: tf-job-operator
type: ClusterIP
现在我们看到已经创建了名为tf-job-operator
的Service,会分配一个Cluster IP,该Service还会持续的监听selector下面的 Pod,会把这些Pod信息更新到一个名为 tf-job-operator 的Endpoints对象上去,这个对象就类似于我们上面说的Pod集合了。
3.2.3 什么是 controller
因为 Kubernetes 现有的资源类型无法满足我们的需求,因此需要通过 Custom Resource Definition 的机制进行扩展。
K8S中一切都是resource,比如Deployment,Service等等。
我们可以基于CRD(CustomResourceDefinitions)功能新增resource,比如我想自定义一种Deployment资源,提供不同的部署策略。
我们知道resource可以通过k8s的RESTFUL API进行CURD操作,对于CRD创建的resource也是一样的。
,类似于deployment controller等等,监听对应资源的CURD事件,做出对应的处理,比如部署POD。
其实,,我们下面也主要就是讲解这个 controller。
3.3 Spec
我们首先给出一个 Job Spec,这样大家可以在后续和代码中对应。样例如下,拥有一个 master,2个 workers,一个 PS。
apiVersion: "kubeflow.org/v1alpha1" # 指定api版本,此值必须在kubectl api-versions中
kind: "TFJob" # 指定创建资源的角色/类型
metadata: # 资源的元数据/属性
name: "example-job"
spec: # 资源规范字段
replicaSpecs: # 声明副本数目
- replicas: 1
tfReplicaType: MASTER
template: # 模版
spec:
containers:
- image: gcr.io/tf-on-k8s-dogfood/tf_sample:dc944ff # 容器使用的镜像地址
name: tensorflow
args:
- --log_dir=gs://my-job/log-dir
restartPolicy: OnFailure
- replicas: 2
tfReplicaType: WORKER
template:
spec:
containers:
- image: gcr.io/tf-on-k8s-dogfood/tf_sample:dc944ff
name: tensorflow
args:
- --log_dir=gs://my-job/log-dir
restartPolicy: OnFailure
- replicas: 1
tfReplicaType: PS
下面我们开始进入代码世界。
3.4 TFJob
首先我们看看 TFJob 的定义,大致可以和上面的 Spec 中找到对应关系,因为本文目的是了解其大略,所以我们就只分析这些即可。
// TFJob represents a TFJob resource.
type TFJob struct {
// Standard Kubernetes type metadata.
metav1.TypeMeta `json:",inline"`
// Standard Kubernetes object's metadata.
// +optional
metav1.ObjectMeta `json:"metadata,omitempty"`
// Specification of the desired state of the TFJob.
// +optional
Spec TFJobSpec `json:"spec,omitempty"`
// Most recently observed status of the TFJob.
// Populated by the system.
// Read-only.
// +optional
Status commonv1.JobStatus `json:"status,omitempty"`
}
// TFJobSpec is a desired state description of the TFJob.
type TFJobSpec struct {
// RunPolicy encapsulates various runtime policies of the distributed training
// job, for example how to clean up resources and how long the job can stay
// active.
RunPolicy commonv1.RunPolicy `json:"runPolicy,inline"`
// SuccessPolicy defines the policy to mark the TFJob as succeeded.
// Default to "", using the default rules.
// +optional
SuccessPolicy *SuccessPolicy `json:"successPolicy,omitempty"`
// A map of TFReplicaType (type) to ReplicaSpec (value). Specifies the TF cluster configuration.
// For example,
// {
// "PS": ReplicaSpec,
// "Worker": ReplicaSpec,
// }
TFReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec `json:"tfReplicaSpecs"`
// // A switch to enable dynamic worker
EnableDynamicWorker bool `json:"enableDynamicWorker,omitempty"`
}
3.5 角色
其次我们看看 TF-Operator 之中,对 TF 角色的对应实现。
3.5.1 定义
首先是角色定义。这里的角色基本对应了 Tensorflow 的各个角色,包括很多为了兼容而保留的角色。
// setTypeNamesToCamelCase sets the name of all replica types from any case to correct case.
func setTypeNamesToCamelCase(tfJob *TFJob) {
setTypeNameToCamelCase(tfJob, TFReplicaTypePS)
setTypeNameToCamelCase(tfJob, TFReplicaTypeWorker)
setTypeNameToCamelCase(tfJob, TFReplicaTypeChief)
setTypeNameToCamelCase(tfJob, TFReplicaTypeMaster)
setTypeNameToCamelCase(tfJob, TFReplicaTypeEval)
}
const (
// TFReplicaTypePS is the type for parameter servers of distributed TensorFlow.
TFReplicaTypePS commonv1.ReplicaType = "PS"
// TFReplicaTypeWorker is the type for workers of distributed TensorFlow.
// This is also used for non-distributed TensorFlow.
TFReplicaTypeWorker commonv1.ReplicaType = "Worker"
// TFReplicaTypeChief is the type for chief worker of distributed TensorFlow.
// If there is "chief" replica type, it's the "chief worker".
// Else, worker:0 is the chief worker.
TFReplicaTypeChief commonv1.ReplicaType = "Chief"
// TFReplicaTypeMaster is the type for master worker of distributed TensorFlow.
// This is similar to chief, and kept just for backwards compatibility.
TFReplicaTypeMaster commonv1.ReplicaType = "Master"
// TFReplicaTypeEval is the type for evaluation replica in TensorFlow.
TFReplicaTypeEval commonv1.ReplicaType = "Evaluator"
)
3.5.2 创建角色
NewTFJobV2 函数就是依据配置的不同,来创建不同的角色。
这里可以看到,生成 job 时候,基本就是按照 spec 的对应字段来处理。
apiVersion: "kubeflow.org/v1alpha1"
kind: "TFJob"
metadata:
name: "example-job"
spec:
replicaSpecs:
下面是函数定义。
func NewTFJobV2(worker, ps, master, cheif, evaluator int) *tfv1.TFJob {
tfJob := &tfv1.TFJob{
TypeMeta: metav1.TypeMeta{
Kind: tfv1.Kind,
},
ObjectMeta: metav1.ObjectMeta{
Name: TestTFJobName,
Namespace: metav1.NamespaceDefault,
},
Spec: tfv1.TFJobSpec{
TFReplicaSpecs: make(map[commonv1.ReplicaType]*commonv1.ReplicaSpec),
},
}
tfv1.SetObjectDefaults_TFJob(tfJob)
if worker > 0 {
worker := int32(worker)
workerReplicaSpec := &commonv1.ReplicaSpec{
Replicas: &worker,
Template: NewTFReplicaSpecTemplate(),
}
tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeWorker] = workerReplicaSpec
}
if ps > 0 {
ps := int32(ps)
psReplicaSpec := &commonv1.ReplicaSpec{
Replicas: &ps,
Template: NewTFReplicaSpecTemplate(),
}
tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypePS] = psReplicaSpec
}
if master > 0 {
master := int32(master)
masterReplicaSpec := &commonv1.ReplicaSpec{
Replicas: &master,
Template: NewTFReplicaSpecTemplate(),
}
tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeMaster] = masterReplicaSpec
}
if cheif > 0 {
cheif := int32(cheif)
cheifReplicaSpec := &commonv1.ReplicaSpec{
Replicas: &cheif,
Template: NewTFReplicaSpecTemplate(),
}
tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeChief] = cheifReplicaSpec
}
if evaluator > 0 {
evaluator := int32(evaluator)
evaluatorReplicaSpec := &commonv1.ReplicaSpec{
Replicas: &evaluator,
Template: NewTFReplicaSpecTemplate(),
}
tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeChief] = evaluatorReplicaSpec
}
return tfJob
}
3.5.3 如何区分 master
用如下方法区分 master。
func (tc *TFController) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool {
if ContainChieforMasterSpec(replicas) {
return rtype == tfv1.TFReplicaTypeChief || rtype == tfv1.TFReplicaTypeMaster
}
// else check if it is worker with index 0
return rtype == tfv1.TFReplicaTypeWorker && index == 0
}
0x04 Contoller
下面就进入正题,看看 Controller 如何实现。
4.1 K8S CRD关键概念
首先我们需要看看 K8S CRD 的一些关键概念。
-
informer:监听apiserver中特定资源变化,然后会存储到一个线程安全的local cache中,最后回调我们自己实现的event handler。
-
local cache:informer实时同步apiserver(也就是etcd)中的数据到内存中存储,可以有效降低apiserver的查询压力,但缺点就是实时性不好,本地会比远程的数据落后一点点但会最终与etcd一致,所以需要根据情况具体分析是走Local cache还是apiserver实时获取数据。
-
Lister:提供了CURD操作访问local cache。
-
controller:一