文章目录
-
- 安装
- RESTClient,DynamicClient和ClientSet Demo
- 介绍基本操作
-
- 连接 API Server
- 创建一个clientset
- 获取集群的PVC列表
- 监听集群中pvc
-
- 启动监控功能
- 循环事件
- 处理ADDED事件
- 处理DELETED事件
- 运行程序
- client-go 进入pod执行命令
-
- 进入单个 pod 执行命令
- 进入多个 pod 执行命令
- client-go增删改查crd
-
- 示例 CRD
- list 资源
- get 资源
- create 资源
- update 资源
- patch 资源
- delete 资源
- client-go 输出资源
-
- 打印pod详细信息
- client-go管理pvc
-
- 通过label删除标签筛选pvc
- client-go管理namespace
- 常用api
-
- deployment
- service
- pod
安装
go get k8s.io/client-go@v0.20.8 go get k8s.io/apimachinery/pkg/apis/meta/v1
注意版本对应: client-go github
若遇到一些包没有, 执行 go mod tidy。
RESTClient,DynamicClient和ClientSet Demo
在Kubernetes上,通常需要Client来访问Kubernetes目前最常用的对象是RESTClient, DynamicClient和ClientSet这三种Client。今天先介绍一下这三个Client基本意义和一般用法。
K8s二开之 client-go 初探
介绍基本操作
连接 API Server
我们Go client第一步是建立一个和API Server连接。为了做到这一点,我们包中使用它clientcmd,如下代码所示:
import ( ... "k8s.io/client-go/tools/clientcmd" ) func main() {
kubeconfig := filepath.Join( os.Getenv("HOME"), ".kube", "config", ) config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil {
log.Fatal(err) } ... }
_Client-go_从不同的上下文中提供物理功能来获取您的配置,从而使其成为一项不重要的任务。
你能从kubeconfig连接文件启动配置API server。当你的代码在集群外运行时,这是一个理想的解决方案。clientcmd.BuildConfigFromFlags(“”, configFile)
当您的代码在集群中运行时,您可以使用上述函数而不使用任何参数,该函数将通过集群信息连接api server。
clientcmd.BuildConfigFromFlags("", "")
或者我们可以通过rest用集群中的信息配置启动包创建一个包。k8s里所有的Pod都会以Volume自动挂载k8s里面默认的ServiceAccount,所以会使用默认ServiceAccount授权信息),如下:
import "k8s.io/client-go/rest" ... rest.InClusterConfig()
创建一个clientset
我们需要创建一个序列化client让我们得到它API对象。在kubernetes包中的Clientset类型定义提供了公开访问API对象序列化client,如下:
type Clientset struct {
*authenticationv1beta1.AuthenticationV1beta1Client *authorizationv1.AuthorizationV1Client ... *corev1.CoreV1Client }
一旦我们有了正确的配置连接,我们就可以使用这个配置来初始化clientset,如下:
func main() {
config, err := clientcmd.BuildConfigFromFlags(""
, kubeconfig)
... clientset
, err
:= kubernetes.NewForConfig(config) if err
!= nil
{
log.Fatal(err)
}
}
对于我们的例子,我们使用的是v1的API对象。下一步,我们要使用clientset通过CoreV1()去访问核心api资源,如下:
func main() {
...
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
api := clientset.CoreV1()
}
获取集群的PVC列表
我们对clientset执行的最基本操作之一获取存储的API对象的列表。在我们的例子中,我们将要拿到一个namespace下面的pvc列表,如下:
import (
...
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func main() {
var ns, label, field string
flag.StringVar(&ns, "namespace", "", "namespace")
flag.StringVar(&label, "l", "", "Label selector")
flag.StringVar(&field, "f", "", "Field selector")
...
api := clientset.CoreV1()
// setup list options
listOptions := metav1.ListOptions{
LabelSelector: label,
FieldSelector: field,
}
pvcs, err := api.PersistentVolumeClaims(ns).List(listOptions)
if err != nil {
log.Fatal(err)
}
printPVCs(pvcs)
...
}
在上面的代码中,我们使用ListOptions指定 label 和 field selectors (还有namespace)来缩小pvc列表的范围,这个结果的返回类型是v1.PeristentVolumeClaimList。下面的这个代码展示了我们如何去遍历和打印从api server中获取的pvc列表。
func printPVCs(pvcs *v1.PersistentVolumeClaimList) {
template := "%-32s%-8s%-8s\n"
fmt.Printf(template, "NAME", "STATUS", "CAPACITY")
for _, pvc := range pvcs.Items {
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
fmt.Printf(
template,
pvc.Name,
string(pvc.Status.Phase),
quant.String())
}
}
监听集群中pvc
k8s的Go client框架支持为指定的API对象在其生命周期事件中监听集群的能力,包括创建,更新,删除一个指定对象时候触发的CREATED,MODIFIED,DELETED事件。对于我们的命令行工具,我们将要监听在集群中已经声明的PVC的总量。
对于某一个namespace,当pvc的容量到达了某一个阈值(比如说200Gi),我们将会采取某个动作。为了简单起见,我们将要在屏幕上打印个通知。但是在更复杂的实现中,可以使用相同的办法触发一个自动操作。
启动监听功能
现在让我们为PersistentVolumeClaim这个资源通过Watch去创建一个监听器。然后这个监听器通过ResultChan从go的channel中访问事件通知。
func main() {
...
api := clientset.CoreV1()
listOptions := metav1.ListOptions{
LabelSelector: label,
FieldSelector: field,
}
watcher, err :=api.PersistentVolumeClaims(ns).Watch(listOptions)
if err != nil {
log.Fatal(err)
}
ch := watcher.ResultChan()
...
}
循环事件
接下来我们将要处理资源事件。但是在我们处理事件之前,我们先声明resource.Quantity类型的的两个变量为maxClaimsQuant和totalClaimQuant来分别表示我们的申请资源阈值和运行总数。
import(
"k8s.io/apimachinery/pkg/api/resource"
...
)
func main() {
var maxClaims string
flag.StringVar(&maxClaims, "max-claims", "200Gi", "Maximum total claims to watch")
var totalClaimedQuant resource.Quantity
maxClaimedQuant := resource.MustParse(maxClaims)
...
ch := watcher.ResultChan()
for event := range ch {
pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
if !ok {
log.Fatal("unexpected type")
}
...
}
}
在上面的for-range循环中,watcher的channel用于处理来自服务器传入的通知。每个事件赋值给变量event,并且event.Object的类型被声明为PersistentVolumeClaim类型,所以我们能从中提取出来。
处理ADDED事件
当一个新的PVC创建的时候,event.Type的值被设置为watch.Added。然后我们用下面的代码去获取新增的声明的容量(quant),将其添加到正在运行的总容量中(totalClaimedQuant)。最后我们去检查是否当前的容量总值大于当初设定的最大值(maxClaimedQuant),如果大于的话我们就触发一个事件。
import(
"k8s.io/apimachinery/pkg/watch"
...
)
func main() {
...
for event := range ch {
pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
if !ok {
log.Fatal("unexpected type")
}
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
switch event.Type {
case watch.Added:
totalClaimedQuant.Add(quant)
log.Printf("PVC %s added, claim size %s\n",
pvc.Name, quant.String())
if totalClaimedQuant.Cmp(maxClaimedQuant) == 1 {
log.Printf(
"\nClaim overage reached: max %s at %s",
maxClaimedQuant.String(),
totalClaimedQuant.String())
// trigger action
log.Println("*** Taking action ***")
}
}
...
}
}
}
处理DELETED事件
代码也会在PVC被删除的时候做出反应,它执行相反的逻辑以及把被删除的这个PVC申请的容量在正在运行的容量的总值里面减去。
func main() {
...
for event := range ch {
...
switch event.Type {
case watch.Deleted:
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
totalClaimedQuant.Sub(quant)
log.Printf("PVC %s removed, size %s\n",
pvc.Name, quant.String())
if totalClaimedQuant.Cmp(maxClaimedQuant) <= 0 {
log.Printf("Claim usage normal: max %s at %s",
maxClaimedQuant.String(),
totalClaimedQuant.String(),
)
// trigger action
log.Println("*** Taking action ***")
}
}
...
}
}
运行程序
当程序在一个运行中的集群被执行的时候,首先会列出PVC的列表。然后开始监听集群中新的PersistentVolumeClaim事件。
$> ./pvcwatch
Using kubeconfig: /Users/vladimir/.kube/config
--- PVCs ----
NAME STATUS CAPACITY
my-redis-redis Bound 50Gi
my-redis2-redis Bound 100Gi
-----------------------------
Total capacity claimed: 150Gi
-----------------------------
--- PVC Watch (max claims 200Gi) ----
2018/02/13 21:55:03 PVC my-redis2-redis added, claim size 100Gi
2018/02/13 21:55:03
At 50.0% claim capcity (100Gi/200Gi)
2018/02/13 21:55:03 PVC my-redis-redis added, claim size 50Gi
2018/02/13 21:55:03
At 75.0% claim capcity (150Gi/200Gi)
下面让我们部署一个应用到集群中,这个应用会申请75Gi容量的存储。(例如,让我们通过helm去部署一个实例influxdb)。
helm install --name my-influx \
--set persistence.enabled=true,persistence.size=75Gi stable/influxdb
正如下面你看到的,我们的工具立刻反应出来有个新的声明以及一个警告因为当前的运行的声明总量已经大于我们设定的阈值。
--- PVC Watch (max claims 200Gi) ----
...
2018/02/13 21:55:03
At 75.0% claim capcity (150Gi/200Gi)
2018/02/13 22:01:29 PVC my-influx-influxdb added, claim size 75Gi
2018/02/13 22:01:29
Claim overage reached: max 200Gi at 225Gi
2018/02/13 22:01:29 *** Taking action ***
2018/02/13 22:01:29
At 112.5% claim capcity (225Gi/200Gi)
相反,从集群中删除一个PVC的时候,该工具会相应展示提示信息。
...
At 112.5% claim capcity (225Gi/200Gi)
2018/02/14 11:30:36 PVC my-redis2-redis removed, size 100Gi
2018/02/14 11:30:36 Claim usage normal: max 200Gi at 125Gi
2018/02/14 11:30:36 *** Taking action ***
client-go 进入pod执行命令
进入单个 pod 执行命令
package main
import (
"flag"
"fmt"
"io"
"os"
"path/filepath"
"golang.org/x/crypto/ssh/terminal"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name("myapp-d46f5678b-m98p2").
Namespace("default").
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: []string{
"echo", "hello world"},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if !terminal.IsTerminal(0) || !terminal.IsTerminal(1) {
fmt.Errorf("stdin/stdout should be terminal")
}
oldState, err := terminal.MakeRaw(0)
if err != nil {
fmt.Println(err)
}
defer terminal.Restore(0, oldState)
screen := struct {
io.Reader
io.Writer
}{
os.Stdin, os.Stdout}
if err = exec.Stream(remotecommand.StreamOptions{
Stdin: screen,
Stdout: screen,
Stderr: screen,
Tty: false,
}); err != nil {
fmt.Print(err)
}
}
$ go run execpod.go
hello world
进入多个 pod 执行命令
package main
import (
"bufio"
"fmt"
"io"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type App struct {
Config *restclient.Config
Namespace string
PodName string
}
func NewApp(namespace string, podName string) *App {
config := LoadKubernetesConfig()
return &App{
Config: config, Namespace: namespace, PodName: podName}
}
func LoadKubernetesConfig() *restclient.Config {
kubeconfig := pflag.Lookup("kubefile").Value.String()
// uses the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
return config
}
func ExecCommandInPodContainer(config *restclient.Config, namespace string, podName string, containerName string,
command string) (string, error) {
client, err := kubernetes.NewForConfig(config)
reader, writer := io.Pipe()
var cmdOutput string
go func() {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
cmdOutput += fmt.Sprintln(line)
}
}()
stdin := reader
stdout := writer
stderr := writer
tty := false
cmd := []string{
"bash",
"-c",
command,
}
req := client.CoreV1().RESTClient().Post().Resource("pods").Name(podName).
Namespace(namespace).SubResource("exec")
option := &v1.PodExecOptions{
Command: cmd,
Container: containerName,
Stdin: stdin != nil,
Stdout: stdout != nil,
Stderr: stderr != nil,
TTY: tty,
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return "", err
}
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
})
if err != nil {
return "", err
}
return cmdOutput, nil
}
func (orch *App) GetClusterInfo() (string, error) {
// check clusters info
result, err := ExecCommandInPodContainer(orch.Config, orch.Namespace, orch.PodName, "nginx", "echo `date +%Y%m%d-%H:%M` hello wold")
if err != nil {
fmt.Println("Error occoured" + err.Error())
}
return result, nil
}
func main() {
pflag.String("kubefile", "/root/.kube/config", "Kube file to load")
pflag.String("namespace", "default", "App Namespace")
pflag.Parse()
appNamespace := pflag.Lookup("namespace").Value.String()
config := LoadKubernetesConfig()
client, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Println(err.Error())
}
appPodList, err := client.CoreV1().Pods(appNamespace).List(metav1.ListOptions{
})
if err != nil {
fmt.Println(err.Error())
}
for _, pod := range appPodList.Items {
app := NewApp(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
result, err := app.GetClusterInfo()
if err != nil {
fmt.Println(err.Error())
}
fmt.Println(result)
}
}
$ go run test1.go
20201216-10:13 hello wold
20201216-10:13 hello wold
client-go增删改查crd
示例 CRD
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
# name must match the spec fields below, and be in the form: <plural>.<group>
name: crontabs.stable.example.com
spec:
# group name to use for REST API: /apis/<group>/<version>
group: stable.example.com
# list of versions supported by this CustomResourceDefinition
versions:
- name: v1
# Each version can be enabled/disabled by Served flag.
served: true
# One and only one version must be marked as the storage version.
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
cronSpec:
type: string
image:
type: string
replicas:
type: integer
# either Namespaced or Cluster
scope: Namespaced
names:
# plural name to be used in the URL: /apis/<group>/<version>/<plural>
plural: crontabs
# singular name to be used as an alias on the CLI and for display
singular: crontab
# kind is normally the CamelCased singular type. Your resource manifests use this.
kind: CronTab
# shortNames allow shorter string to match your resource on the CLI
shortNames:
- ct
通过 kubectl 创建一下这个 crd ,然后再创建几个 crd 对象
list 资源
首先是如何 list 前面创建的3个资源,类似 kubectl get crontab.stable.example.com 的效果。
简单来说就是通过 k8s.io/client-go/dynamic 里的 Interface 提供的方法来操作 crd 资源。 关键是怎么拿到 NamespaceableResourceInterface 实例以及把结果转换为自定义的结构体。
package main
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
var gvr = schema.GroupVersionResource{
Group: "stable.example.com",
Version: "v1",
Resource: "crontabs",
}
type CrontabSpec struct {
CronSpec string `json:"cronSpec"`
Image string `json:"image"`
}
type Crontab struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec CrontabSpec `json:"spec,omitempty"`
}
type CrontabList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Crontab `json:"items"`
}
func listCrontabs(client dynamic.Interface, namespace string) (*CrontabList, error) {
list, err := client.Resource(gvr).Namespace(namespace).List(metav1.ListOptions{
})
if err != nil {
return nil, err
}
data, err := list.MarshalJSON()
if err != nil {
return nil, err
}
var ctList CrontabList
if err := json.Unmarshal(data, &ctList); err != nil {
return nil, err
}
return &ctList, nil
}
func main() {
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err)
}
client, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
list, err := listCrontabs(client, "default")
if err != nil {
panic(err)
}
for _, t := range list.Items {
fmt.Printf("%s %s %s %s\n", t.Namespace, t.Name, t.Spec.CronSpec, t.Spec.Image)
}
}
代码相对来说比较简单,有一个要注意的地方就是 gvr 里各个字段的值来自 crd 定义的 yaml 文件:
spec:
# group name to use for REST API: /apis/<group>/<version>
# 对应 Group 字段的值
group: stable.example.com
# list of versions supported by this CustomResourceDefinition
versions:
- name: v1 # 对应 Version 字段的可选值
# ...
names:
# plural name to be used in the URL: /apis/<group>/<version>/<plural>
# 对应 Resource 字段的值
plural: crontabs
注意:因为这个 crd 定义的是 namespace 资源,如果是非 namespace 资源的话,应当改为使用不指定 namespace 的方法:
client.Resource(gvr).List(metav1.ListOptions{
})
get 资源
get 资源的方法也是通过 dynamic.Interface 来实现,关键是怎么把结果转换为上面定义的结构体, 关键代码示例如下: get.go
package main
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
var gvr = schema.GroupVersionResource{
Group: "stable.example.com",
Version: "v1",
Resource: "crontabs",
}
type CrontabSpec struct {
CronSpec string `json:"cronSpec"`
Image string `json:"image"`
}
type Crontab struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec CrontabSpec `json:"spec,omitempty"`
}
type CrontabList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Crontab `json:"items"`
}
func getCrontab(client dynamic.Interface, namespace string, name string) (*Crontab, error) {
utd, err := client.Resource(gvr).Namespace(namespace).Get(name, metav1.GetOptions{
})
if err != nil {
return nil, err
}
data, err := utd.MarshalJSON()
if err != nil {
return nil, err
}
var ct Crontab
if err := json.Unmarshal(data, &ct); err != nil {
return nil, err
}
return &ct, nil
}
func main() {
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err)
}
client, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
ct, err := getCrontab(client, "default", "cron-1")
if err != nil {
panic(err)
}
fmt.Printf("%s %s %s %s\n", ct.Namespace, ct.Name, ct.Spec.CronSpec, ct.Spec.Image)
}
执行效果:
$ go run main.go
default cron-1 * * * * */5 my-awesome-cron-image-1
create 资源
create 资源的方法也是通过 dynamic.Interface 来实现 ,这里主要记录一下怎么基于 yaml 文本的内容来创建资源。
关键代码示例如下: create.go
package main import ( "encoding/json" "fmt" "os" "path/filepath" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/serializer/yaml" ) var gvr = schema.GroupVersionResource{ Group: "stable.example.com", Version: "v1", Resource: "crontabs", } type CrontabSpec struct { CronSpec string `json:"cronSpec"` Image string `json:"image"` } type Crontab struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec CrontabSpec `json:"spec,omitempty"` } type CrontabList struct { metav1.TypeMeta `json:" 标签:
m33连接器