百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

9. Redis Operator (2) —— Sentinel部署

mhr18 2025-07-27 22:27 4 浏览 0 评论

0. 简介

上一篇 ,我们借由 Redis 的单机部署,学习了一下 Operator 的基本使用,今天,我们在此基础上,部署一下 Redis 的 Sentinel 模式。

Sentinel 本质上是为了解决 Redis 集群的高可用诞生的,一般而言有三种方式实现高可用:

  • Redis Sentinel :利用≥3的奇数个哨兵(Sentinel)节点,在主节点发生故障迁移时,从剩余的从节点中选择主节点,从而实现故障迁移,保证集群运行;其特点是运维复杂度低,在中小规模集群中比较合适;
  • Redis Cluster :通过数据分片,分片内主从保证数据强一致性,分片间多主分担写入,本质上是哈希分片,去中心化的一种方案;适合大中规模的集群;
  • 第三方VIP方案 :这种方案就更好理解了,通过给集群对外暴露虚拟IP,再通过内部监控实时选择主节点(读写)和从节点(读)。

其中,Redis Sentinel 比较适合我们的学习集群( #技术分享 #掘金哈哈,实际只有一台主机),其整体方案如下所示:

graph LR
    %% 客户端层
    Client1([客户端<br/>App1])
    Client2([客户端<br/>App2])
    ClientN([客户端群组<br/>AppN])

    %% Sentinel集群层
    subgraph Sentinel Cluster
        direction LR
        S1[Sentinel 节点1]
        S2[Sentinel 节点2]
        S3[Sentinel 节点3]

        S1<-->|Gossip协议<br/>PING/PONG|S2
        S2<-->|Gossip协议<br/>PING/PONG|S3
        S1<-->|Gossip协议<br/>PING/PONG|S3
    end

    %% Redis数据层
    subgraph Redis 数据节点
        direction BT
        Master([主节点<br/>Master])
        Slave1([从节点1<br/>Slave])
        Slave2([从节点2<br/>Slave])

        Master==主从复制<br/>SYNC命令==>Slave1
        Master==主从复制<br/>SYNC命令==>Slave2
    end

    %% 监控关系
    S1-.-|监控心跳<br/>每1秒PING|Master
    S2-.-|监控心跳<br/>每1秒PING|Master
    S3-.-|监控心跳<br/>每1秒PING|Master

    %% 客户端访问路径
    Client1-->|1.查询主节点地址|S1
    S1-->|2.返回Master地址|Client1
    Client1==>|3.直连读写|Master

    Client2==>|1.查询主节点地址|S2
    S2-->|2.返回Slave地址|Client2
    Client2==>|3.只读访问|Slave1

    %% 故障转移通道
    S1===|选举Leader Sentinel|S2
    S2===|执行故障转移|Slave1
    Slave1-.->|切换为新Master|Master

在我们的例子中,我们也将实现一个如上图所示的方案的 Sentinel:包含三个 sentinel 节点和三个 redis 节点。

1. 开发环境

所有的开发环境都上一篇相同,但是 kind 搭建的集群信息修改如下:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:

- role: control-plane

  extraPortMappings:
  - containerPort: 30950
    hostPort: 80


    listenAddress: "127.0.0.1"


    protocol: TCP
  - containerPort: 30999
    hostPort: 6378
    protocol: TCP
  - containerPort: 31000
    hostPort: 6379
    protocol: TCP
  - containerPort: 31001
    hostPort: 26379
    protocol: TCP

- role: worker
  extraMounts:

  - hostPath: /Users/chenyiguo/workspace/k8s/kind/multi_data/worker1
    containerPath: /data
  labels:
    iguochan.io/redis-node: redis1

- role: worker
  extraMounts:

  - hostPath: /Users/chenyiguo/workspace/k8s/kind/multi_data/worker2
    containerPath: /data
  labels:
    iguochan.io/redis-node: redis2

- role: worker
  extraMounts:

  - hostPath: /Users/chenyiguo/workspace/k8s/kind/multi_data/worker3
    containerPath: /data
  labels:
    iguochan.io/redis-node: redis3

因为我们实现 Sentinel 集群之后,会使用 redis-cli 指令去验证,所以我们也对外暴露一下主节点的端口,用于写操作(这实际是不规范的哈,本质上应该由 sentinel 集群返回 redis 主节点的地址,然后再根据地址进行访问)。所以我们对外暴露了 6378 这个端口作为主节点访问端口。

另外,我们对外暴露 6379 作为集群对外的 redis 读端口,暴露 26379 作为 sentinel 端口。

除此,我们还给每一个 node 打上了不同的标记,以保证后续使用到的 Statefulset 对应的不同 pod 会调度到相对应的 node 上,这是因为,每个 node 的存储和配置理应是不同的,我们的 redis 也不是一个无状态的服务,所以从我浅显的理解上,应该要保证 pod 调度到对应的机器上。(当然,这没有经过太多的深思熟虑,如果大家有更好的方案,可以在评论区给出)。

2. Operator 开发

2.1 创建API

我们在原有的工程的基础上创建 API:

$ kubebuilder create api --group cache --version v1 --kind RedisSentinel

C 2.2 实现Controller

首先我们需要确定一下方案,其基本的方案如下图所示,我们通过生成 RedisSentinel 的 CR,来管理整个集群,其中通过三个不同的服务对外暴露上面说到的 主服务端口读端口sentinel 端口 ;另外通过 statefuleset 来实现对 redis 以及 sentinel 的 pod 管理。

graph TD
    subgraph Kubernetes
        Operator[Operator控制器] -->|管理| RedisSentinelCR[RedisSentinel CR]

        RedisSentinelCR -->|创建| RedisCluster[Redis集群]
        RedisSentinelCR -->|创建| SentinelCluster[Sentinel集群]

        subgraph Redis集群
            RedisMaster[Redis Master]
            RedisSlave1[Redis Slave 1]
            RedisSlave2[Redis Slave 2]

            RedisMaster -->|数据复制| RedisSlave1
            RedisMaster -->|数据复制| RedisSlave2
        end

        subgraph Sentinel集群
            Sentinel1[Sentinel 1]
            Sentinel2[Sentinel 2]
            Sentinel3[Sentinel 3]

            Sentinel1 -->|监控| RedisMaster
            Sentinel2 -->|监控| RedisMaster
            Sentinel3 -->|监控| RedisMaster
        end

        Services[服务暴露]
        Services --> RedisMasterService[Master服务:6378]
        Services --> RedisSlaveService[Slave服务:6379]
        Services --> SentinelService[Sentinel服务:26379]

        RedisMasterService --> RedisMaster
        RedisSlaveService --> RedisSlave1 & RedisSlave2
        SentinelService --> Sentinel1 & Sentinel2 & Sentinel3
    end

    Client[客户端应用] -->|写请求| RedisMasterService
    Client -->|读请求| RedisSlaveService
    Client -->|查询主节点| SentinelService

2.2.1 定义CRD

package v1

import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" )

type RedisSentinelSpec struct { Image string `json:"image,omitempty"`

SentinelImage string `json:"sentinelImage,omitempty"`

RedisReplicas int32 `json:"redisReplicas,omitempty"`

SentinelReplicas int32 `json:"sentinelReplicas,omitempty"`

MasterNodePort int32 `json:"masterNodePort,omitempty"`

NodePort int32 `json:"nodePort,omitempty"`

SentinelNodePort int32 `json:"sentinelNodePort,omitempty"`

Storage RedisSentinelStorageSpec `json:"storage,omitempty"` }

type RedisSentinelStorageSpec struct { Size resource.Quantity `json:"size,omitempty"`

HostPath string `json:"hostPath,omitempty"` }

type RedisSentinelStatus struct { Phase RedisPhase `json:"phase,omitempty"`

Endpoint string `json:"endpoint,omitempty"`

SentinelEndpoint string `json:"sentinelEndpoint,omitempty"`

Master string `json:"master,omitempty"`

LastRoleUpdateTime metav1.Time `json:"lastRoleUpdateTime,omitempty"` }

type RedisSentinel struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"`

Spec RedisSentinelSpec `json:"spec,omitempty"` Status RedisSentinelStatus `json:"status,omitempty"` }

type RedisSentinelList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []RedisSentinel `json:"items"` }

func init() { SchemeBuilder.Register(&RedisSentinel{}, &RedisSentinelList{}) }

以上对 RedisSentinel 的 CRD 进行了定义,和 RedisStandalone 相比,多了不少,其中包括 Sentinel 的镜像,以及我们上面说到的三个端口等。

2.2.2 实现controller

package controller

import ( "context" "fmt" "strings" "time"

cachev1 "github.com/IguoChan/redis-operator/api/v1" "github.com/go-redis/redis" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" )

type RedisSentinelReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder }

const ( MasterPort = 6378 SentinelPort = 26379 )

func (r *RedisSentinelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) logger.Info("Reconciling RedisSentinel", "request", req.NamespacedName)

redisSentinel := &cachev1.RedisSentinel{} if err := r.Get(ctx, req.NamespacedName, redisSentinel); err != nil { if errors.IsNotFound(err) { return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } return ctrl.Result{RequeueAfter: 30 * time.Second}, err }

if r.checkPodDeletion(ctx, redisSentinel) { logger.Info("Detected pod deletion, waiting for failover") return ctrl.Result{RequeueAfter: 10 * time.Second}, nil }

if err := r.reconcileRedisStatefulSet(ctx, redisSentinel); err != nil { logger.Error(err, "Failed to reconcile Redis StatefulSet") return ctrl.Result{RequeueAfter: 30 * time.Second}, r.updateStatus(ctx, redisSentinel, cachev1.RedisPhaseError, err) }

if err := r.reconcileSentinelStatefulSet(ctx, redisSentinel); err != nil { logger.Error(err, "Failed to reconcile Sentinel StatefulSet") return ctrl.Result{RequeueAfter: 30 * time.Second}, r.updateStatus(ctx, redisSentinel, cachev1.RedisPhaseError, err) }

if err := r.reconcileRedisService(ctx, redisSentinel); err != nil { logger.Error(err, "Failed to reconcile Redis Service") return ctrl.Result{RequeueAfter: 30 * time.Second}, r.updateStatus(ctx, redisSentinel, cachev1.RedisPhaseError, err) }

if err := r.reconcileSentinelService(ctx, redisSentinel); err != nil { logger.Error(err, "Failed to reconcile Sentinel Service") return ctrl.Result{RequeueAfter: 30 * time.Second}, r.updateStatus(ctx, redisSentinel, cachev1.RedisPhaseError, err) }

if err := r.reconcileConfigMaps(ctx, redisSentinel); err != nil { logger.Error(err, "Failed to reconcile ConfigMaps") return ctrl.Result{RequeueAfter: 30 * time.Second}, r.updateStatus(ctx, redisSentinel, cachev1.RedisPhaseError, err) }

if err := r.reconcilePersistentVolumes(ctx, redisSentinel); err != nil { logger.Error(err, "Failed to reconcile Persistent Volumes") return ctrl.Result{RequeueAfter: 30 * time.Second}, r.updateStatus(ctx, redisSentinel, cachev1.RedisPhaseError, err) }

if err := r.updateRedisRoleLabels(ctx, redisSentinel); err != nil { logger.Error(err, "Failed to update Redis role labels") r.Recorder.Eventf(redisSentinel, corev1.EventTypeWarning, "LabelUpdateFailed", "Failed to update Redis role labels: %v", err) return ctrl.Result{RequeueAfter: 30 * time.Second}, r.updateStatus(ctx, redisSentinel, cachev1.RedisPhaseError, err) }

if err := r.reconcileRedisMasterEndpoints(ctx, redisSentinel); err != nil { logger.Error(err, "Failed to reconcile Redis master Endpoints") return ctrl.Result{RequeueAfter: 30 * time.Second}, err }

return ctrl.Result{RequeueAfter: 30 *

}

func (r *RedisSentinelReconciler) reconcileRedisStatefulSet(ctx context.Context, rs *cachev1.RedisSentinel) error { logger := log.FromContext(ctx) name := rs.Name + "-redis"

replicas := rs.Spec.RedisReplicas if replicas < 1 { replicas = 3 }

sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "redis"), }, Spec: appsv1.StatefulSetSpec{ ServiceName: name + "-headless", Replicas: pointer.Int32(replicas), Selector: &metav1.LabelSelector{ MatchLabels: labelsForRedisSentinel(rs.Name, "redis"), }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labelsForRedisSentinel(rs.Name, "redis"), }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "redis", Image: rs.Spec.Image, ImagePullPolicy: corev1.PullIfNotPresent, Ports: []corev1.ContainerPort{ { Name: "redis", ContainerPort: RedisPort, }, }, VolumeMounts: []corev1.VolumeMount{ { Name: "redis-config", MountPath: "/redis-config", }, { Name: "redis-data", MountPath: "/data", }, }, Command: []string{ "sh", "-c", "sh /redis-config/init.sh", }, }, }, Volumes: []corev1.Volume{ { Name: "redis-config", VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: rs.Name + "-redis-config", }, }, }, }, }, }, }, VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ { ObjectMeta: metav1.ObjectMeta{ Name: "redis-data", }, Spec: corev1.PersistentVolumeClaimSpec{ AccessModes: []corev1.PersistentVolumeAccessMode{ corev1.ReadWriteOnce, }, Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceStorage: rs.Spec.Storage.Size, }, }, StorageClassName: pointer.String("redis-storage"), }, }, }, }, }

if err := ctrl.SetControllerReference(rs, sts, r.Scheme); err != nil { return err }

foundSts := &appsv1.StatefulSet{} err := r.Get(ctx, types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, foundSts) if err != nil && errors.IsNotFound(err) { logger.Info("Creating Redis StatefulSet", "name", sts.Name) if err := r.Create(ctx, sts); err != nil { return err } } else if err != nil { return err } else { logger.Info("Updating Redis StatefulSet", "name", sts.Name) sts.Spec.DeepCopyInto(&foundSts.Spec) if err := r.Update(ctx, foundSts); err != nil { return err } }

return nil }

func (r *RedisSentinelReconciler) reconcileSentinelStatefulSet(ctx context.Context, rs *cachev1.RedisSentinel) error { logger := log.FromContext(ctx) name := rs.Name + "-sentinel"

replicas := rs.Spec.SentinelReplicas if replicas < 1 { replicas = 3 }

sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "sentinel"), }, Spec: appsv1.StatefulSetSpec{ ServiceName: name + "-headless", Replicas: pointer.Int32(replicas), Selector: &metav1.LabelSelector{ MatchLabels: labelsForRedisSentinel(rs.Name, "sentinel"), }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labelsForRedisSentinel(rs.Name, "sentinel"), }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "sentinel", Image: rs.Spec.SentinelImage, ImagePullPolicy: corev1.PullIfNotPresent, Ports: []corev1.ContainerPort{ { Name: "sentinel", ContainerPort: SentinelPort, }, }, Command: []string{"sh", "-c", "sh /sentinel-config/init.sh", }, VolumeMounts: []corev1.VolumeMount{ { Name: "sentinel-config", MountPath: "/sentinel-config", }, { Name: "sentinel-config-dir", MountPath: "/tmp", }, }, }, }, Volumes: []corev1.Volume{ { Name: "sentinel-config", VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: fmt.Sprintf("%s-sentinel-config", rs.Name), }, }, }, }, { Name: "sentinel-config-dir", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, }, }, }, }, }

if err := ctrl.SetControllerReference(rs, sts, r.Scheme); err != nil { return err }

foundSts := &appsv1.StatefulSet{} err := r.Get(ctx, types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, foundSts) if err != nil && errors.IsNotFound(err) { logger.Info("Creating Sentinel StatefulSet", "name", sts.Name) if err := r.Create(ctx, sts); err != nil { return err } } else if err != nil { return err } else { logger.Info("Updating Sentinel StatefulSet", "name", sts.Name) sts.Spec.DeepCopyInto(&foundSts.Spec) if err := r.Update(ctx, foundSts); err != nil { return err } }

return nil }

func (r *RedisSentinelReconciler) reconcileRedisService(ctx context.Context, rs *cachev1.RedisSentinel) error { headlessSvc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: rs.Name + "-redis-headless", Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "redis"), }, Spec: corev1.ServiceSpec{ ClusterIP: corev1.ClusterIPNone, Selector: labelsForRedisSentinel(rs.Name, "redis"), Ports: []corev1.ServicePort{ { Name: "redis", Port: RedisPort, TargetPort: intstr.FromInt(int(RedisPort)), }, }, }, }

nodePortSvc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: rs.Name + "-redis", Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "redis"), }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeNodePort, Selector: labelsForRedisSentinel(rs.Name, "redis"), Ports: []corev1.ServicePort{ { Name: "redis", Port: RedisPort, TargetPort: intstr.FromInt(int(RedisPort)), NodePort: rs.Spec.NodePort, }, }, }, }

selector := labelsForRedisSentinel(rs.Name, "redis") selector["redis-role"] = "master" masterNodePortSvc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: rs.Name + "-redis-master", Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "redis"), }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeNodePort, Selector: selector, Ports: []corev1.ServicePort{ { Name: "redis", Port: MasterPort, TargetPort: intstr.FromInt(int(RedisPort)), NodePort: rs.Spec.MasterNodePort, }, }, }, }

if err := ctrl.SetControllerReference(rs, headlessSvc, r.Scheme); err != nil { return err } if err := ctrl.SetControllerReference(rs, nodePortSvc, r.Scheme); err != nil { return err } if err := ctrl.SetControllerReference(rs, masterNodePortSvc, r.Scheme); err != nil { return err }

if err := r.createOrUpdateService(ctx, headlessSvc); err != nil { return err } if err := r.createOrUpdateService(ctx, nodePortSvc); err != nil { return err } return r.createOrUpdateService(ctx, masterNodePortSvc) }

func (r *RedisSentinelReconciler) reconcileSentinelService(ctx context.Context, rs *cachev1.RedisSentinel) error { headlessSvc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: rs.Name + "-sentinel-headless", Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "sentinel"), }, Spec: corev1.ServiceSpec{ ClusterIP: corev1.ClusterIPNone, Selector: labelsForRedisSentinel(rs.Name, "sentinel"), Ports: []corev1.ServicePort{ { Name: "sentinel", Port: SentinelPort, TargetPort: intstr.FromInt(int(SentinelPort)), }, }, }, }

nodePortSvc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: rs.Name + "-sentinel", Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "sentinel"), }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeNodePort, Selector: labelsForRedisSentinel(rs.Name, "sentinel"), Ports: []corev1.ServicePort{ { Name: "sentinel", Port: SentinelPort, TargetPort: intstr.FromInt(int(SentinelPort)), NodePort: rs.Spec.SentinelNodePort, }, }, }, }

if err := ctrl.SetControllerReference(rs, headlessSvc, r.Scheme); err != nil { return err } if err := ctrl.SetControllerReference(rs, nodePortSvc, r.Scheme); err != nil { return err }

if err := r.createOrUpdateService(ctx, headlessSvc); err != nil { return err } return r.createOrUpdateService(ctx, nodePortSvc) }

func (r *RedisSentinelReconciler) reconcileConfigMaps(ctx context.Context, rs *cachev1.RedisSentinel) error { masterHost := fmt.Sprintf("%s-redis-0.%s-redis-headless.%s.svc.cluster.local", rs.Name, rs.Name, rs.Namespace)

redisCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: rs.Name + "-redis-config", Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "redis"), }, Data: map[string]string{ "redis-master.conf": redisMasterConfig, "redis-replica.conf": redisReplicaConfig, "init.sh": redisInitSh(masterHost), }, }

sentinelCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: rs.Name + "-sentinel-config", Namespace: rs.Namespace, Labels: labelsForRedisSentinel(rs.Name, "sentinel"), }, Data: map[string]string{ "sentinel.conf": sentinelConfig(masterHost, rs.Spec.SentinelReplicas), "init.sh": sentinelInitConfig, }, }

if err := ctrl.SetControllerReference(rs, redisCM, r.Scheme); err != nil { return err } if err := ctrl.SetControllerReference(rs, sentinelCM, r.Scheme); err != nil { return err }

if err := r.createOrUpdateConfigMap(ctx, redisCM); err != nil { return err } return r.createOrUpdateConfigMap(ctx, sentinelCM) }

func (r *RedisSentinelReconciler) reconcilePersistentVolumes(ctx context.Context, rs *cachev1.RedisSentinel) error { logger := log.FromContext(ctx)

replicas := rs.Spec.RedisReplicas if replicas < 1 { replicas = 3 }

for i := 0; i < int(replicas); i++ { pvName := fmt.Sprintf("%s-redis-pv-%d", rs.Name, i) pvPath := fmt.Sprintf("%s/%s/redis-%d", rs.Spec.Storage.HostPath, rs.Name, i)

pv := &corev1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: pvName, }, Spec: corev1.PersistentVolumeSpec{ Capacity: corev1.ResourceList{ corev1.ResourceStorage: rs.Spec.Storage.Size, }, AccessModes: []corev1.PersistentVolumeAccessMode{ corev1.ReadWriteOnce, }, PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRetain, StorageClassName: "redis-storage", PersistentVolumeSource: corev1.PersistentVolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: pvPath, }, }, NodeAffinity: &corev1.VolumeNodeAffinity{ Required: &corev1.NodeSelector{ NodeSelectorTerms: []corev1.NodeSelectorTerm{ { MatchExpressions: []corev1.NodeSelectorRequirement{ { Key: "iguochan.io/redis-node", Operator: corev1.NodeSelectorOpIn, Values: []string{fmt.Sprintf("redis%d", i%3+1)}, }, }, }, }, }, }, }, }

if err := r.Create(ctx, pv); err != nil { if !errors.IsAlreadyExists(err) { logger.Error(err, "Failed to create PV", "name", pv.Name) return err } logger.Info("PV already exists", "name", pv.Name) } }

for i := 0; i < int(rs.Spec.SentinelReplicas); i++ { pvName := fmt.Sprintf("%s-sentinel-pv-%d", rs.Name, i) pvPath := fmt.Sprintf("%s/%s/sentinel-%d", rs.Spec.Storage.HostPath, rs.Name, i)

pv := &corev1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: pvName, }, Spec: corev1.PersistentVolumeSpec{ Capacity: corev1.ResourceList{ corev1.ResourceStorage: resource.MustParse("100Mi"), }, AccessModes: []corev1.PersistentVolumeAccessMode{ corev1.ReadWriteOnce, }, PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRetain, StorageClassName: "redis-storage", PersistentVolumeSource: corev1.PersistentVolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: pvPath, }, }, NodeAffinity: &corev1.VolumeNodeAffinity{ Required: &corev1.NodeSelector{ NodeSelectorTerms: []corev1.NodeSelectorTerm{ { MatchExpressions: []corev1.NodeSelectorRequirement{ { Key: "iguochan.io/redis-node", Operator: corev1.NodeSelectorOpIn, Values: []string{fmt.Sprintf("redis%d", i%3+1)}, }, }, }, }, }, }, }, }

if err := r.Create(ctx, pv); err != nil { if !errors.IsAlreadyExists(err) { logger.Error(err, "Failed to create PV", "name", pv.Name) return err } logger.Info("PV already exists", "name", pv.Name) } }

return nil }

func (r *RedisSentinelReconciler) updateStatus(ctx context.Context, rs *cachev1.RedisSentinel, phase cachev1.RedisPhase, err error) error { if err != nil && phase != cachev1.RedisPhaseReady { rs.Status.Phase = phase _ = r.Status().Update(ctx, rs) return fmt.Errorf("err: %+v or phase: %s", err, phase) }

sentinelReady, sentinelErr := r.checkPodsReady(ctx, rs, "sentinel") if !sentinelReady { return sentinelErr }

redisReady, redisErr := r.checkPodsReady(ctx, rs, "redis") if !redisReady { return redisErr }

sentinelSvc, svcErr := r.validateService(ctx, rs, rs.Name+"-sentinel") if svcErr != nil { return svcErr }

redisSvc, svcErr := r.validateService(ctx, rs, rs.Name+"-redis") if svcErr != nil { return svcErr }

rs.Status.SentinelEndpoint = fmt.Sprintf("%s:%d", sentinelSvc.Spec.ClusterIP, sentinelSvc.Spec.Ports[0].Port) rs.Status.Endpoint = fmt.Sprintf("%s:%d", redisSvc.Spec.ClusterIP, redisSvc.Spec.Ports[0].Port) rs.Status.Phase = cachev1.RedisPhaseReady

return r.Status().Update(ctx, rs) }

func (r *RedisSentinelReconciler) checkPodsReady(ctx context.Context, rs *cachev1.RedisSentinel, role string) (bool, error) { podList := &corev1.PodList{} labels := client.MatchingLabels{ "app": "redis-sentinel", "name": rs.Name, "component": role, }

if err := r.List(ctx, podList, labels); err != nil { r.Recorder.Eventf(rs, corev1.EventTypeWarning, RecordReasonFailed, "list %s pods failed: %s", role, err.Error()) return false, err }

if len(podList.Items) == 0 { msg := fmt.Sprintf("no %s pods available", role) r.Recorder.Event(rs, corev1.EventTypeNormal, RecordReasonWaiting, msg) rs.Status.Phase = cachev1.RedisPhasePending return false, r.Status().Update(ctx, rs) }

allReady := true for _, pod := range podList.Items { if !isPodReady(pod) { allReady = false break } }

if !allReady { msg := fmt.Sprintf("not all %s pods are ready", role) r.Recorder.Event(rs, corev1.EventTypeNormal, RecordReasonWaiting, msg) rs.Status.Phase = cachev1.RedisPhasePending return false, r.Status().Update(ctx, rs) }

return true, nil }

func isPodReady(pod corev1.Pod) bool { for _, cond := range pod.Status.Conditions { if cond.Type == corev1.PodReady { return cond.Status == corev1.ConditionTrue } } return false }

func (r *RedisSentinelReconciler) validateService(ctx context.Context, rs *cachev1.RedisSentinel, svcName string) (*corev1.Service, error) { svc := &corev1.Service{} key := types.NamespacedName{Namespace: rs.Namespace, Name: svcName}

if err := r.Get(ctx, key, svc); err != nil { r.Recorder.Eventf(rs, corev1.EventTypeWarning, RecordReasonFailed, "get %s service failed: %s", svcName, err.Error()) rs.Status.Phase = cachev1.RedisPhaseError return nil, r.Status().Update(ctx, rs) }

endpoints := &corev1.Endpoints{} if err := r.Get(ctx, key, endpoints); err != nil { r.Recorder.Eventf(rs, corev1.EventTypeWarning, RecordReasonFailed, "get %s endpoints failed: %s", svcName, err.Error()) rs.Status.Phase = cachev1.RedisPhaseError return nil, r.Status().Update(ctx, rs) }

if len(endpoints.Subsets) == 0 || len(endpoints.Subsets[0].Addresses) == 0 { r.Recorder.Eventf(rs, corev1.EventTypeWarning, RecordReasonFailed, "%s service has no endpoints", svcName) rs.Status.Phase = cachev1.RedisPhaseError return nil, r.Status().Update(ctx, rs) }

return svc, nil }

func labelsForRedisSentinel(name, role string) map[string]string { return map[string]string{ "app": "redis-sentinel", "name": name, "component": role, } }

func (r *RedisSentinelReconciler) createOrUpdateService(ctx context.Context, svc *corev1.Service) error { foundSvc := &corev1.Service{} err := r.Get(ctx, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}, foundSvc) if err != nil && errors.IsNotFound(err) { return r.Create(ctx, svc) } else if err != nil { return err }

if svc.Spec.Type == corev1.ServiceTypeNodePort { for i, p := range svc.Spec.Ports { foundSvc.Spec.Ports[i].Port = p.Port foundSvc.Spec.Ports[i].TargetPort = p.TargetPort foundSvc.Spec.Ports[i].NodePort = p.NodePort } }

return r.Update(ctx, foundSvc) }

func (r *RedisSentinelReconciler) createOrUpdateConfigMap(ctx context.Context, cm *corev1.ConfigMap) error { foundCM := &corev1.ConfigMap{} err := r.Get(ctx, types.NamespacedName{Name: cm.Name, Namespace: cm.Namespace}, foundCM) if err != nil && errors.IsNotFound(err) { return r.Create(ctx, cm) } else if err != nil { return err }

foundCM.Data = cm.Data return r.Update(ctx, foundCM) }

func (r *RedisSentinelReconciler) updateRedisRoleLabels(ctx context.Context, rs *cachev1.RedisSentinel) error { podList := &corev1.PodList{} if err := r.List(ctx, podList, client.MatchingLabels{ "app": "redis-sentinel", "name": rs.Name, "component": "redis", }); err != nil { return err }

var ip string var err error sentinelPods := &corev1.PodList{} if err = r.List(ctx, sentinelPods, client.MatchingLabels{ "app": "redis-sentinel", "name": rs.Name, "component": "sentinel", }); err != nil || len(sentinelPods.Items) == 0 { return fmt.Errorf("list sentinel pods err: %+v or len(sentinelPods.Items) == 0", err) }

if ip, _, err = r.getSentinelMasterAddr(ctx, &sentinelPods.Items[0]); err != nil { return err }

for _, pod := range podList.Items { newRole := "slave" if pod.Status.PodIP == ip || strings.Contains(ip, pod.Spec.Hostname) { newRole = "master" }

if pod.Labels["redis-role"] != newRole { patch := client.MergeFrom(pod.DeepCopy()) if pod.Labels == nil { pod.Labels = make(map[string]string) } pod.Labels["redis-role"] = newRole if err := r.Patch(ctx, &pod, patch); err != nil { return err } } } return nil }

func (r *RedisSentinelReconciler) reconcileRedisMasterEndpoints(ctx context.Context, rs *cachev1.RedisSentinel) error { podList := &corev1.PodList{} if err := r.List(ctx, podList, client.MatchingLabels{ "app": "redis-sentinel", "name": rs.Name, "component": "redis", "redis-role": "master", }); err != nil { return err }

if len(podList.Items) == 0 { return nil }

masterPod := podList.Items[0] if masterPod.Status.PodIP == "" { return fmt.Errorf("reconcileRedisMasterEndpoints: masterPod.Status.PodIP is empty") }

endpoints := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: rs.Name + "-redis-master", Namespace: rs.Namespace, }, Subsets: []corev1.EndpointSubset{ { Addresses: []corev1.EndpointAddress{ { IP: masterPod.Status.PodIP, TargetRef: &corev1.ObjectReference{ Kind: "Pod", Name: masterPod.Name, Namespace: masterPod.Namespace, }, }, }, Ports: []corev1.EndpointPort{ { Port: RedisPort, }, }, }, }, }

if err := ctrl.SetControllerReference(rs, endpoints, r.Scheme); err != nil { return err }

found := &corev1.Endpoints{} err := r.Get(ctx, types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}, found) if err != nil && errors.IsNotFound(err) { return r.Create(ctx, endpoints) } else if err != nil { return err }

needsUpdate := false if len(found.Subsets) == 0 { needsUpdate = true } else if len(found.Subsets[0].Addresses) == 0 || found.Subsets[0].Addresses[0].IP != masterPod.Status.PodIP { needsUpdate = true }

if needsUpdate { found.Subsets = endpoints.Subsets return r.Update(ctx, found) }

return nil }

func (r *RedisSentinelReconciler) getSentinelMasterAddr(ctx context.Context, sentinelPod *corev1.Pod) (string, string, error) { logger := log.FromContext(ctx)

sentinelAddr := fmt.Sprintf("%s:%d", sentinelPod.Status.PodIP, SentinelPort) sentinelClient := redis.NewSentinelClient(&redis.Options{ Addr: sentinelAddr, Password: "", DB: 0, })

var masterIP, masterPort string var lastErr error

for i := 0; i < 5; i++ { result, err := sentinelClient.GetMasterAddrByName("mymaster").Result() if err == nil && len(result) >= 2 { masterIP = result[0] masterPort = result[1]

if r.isPodAlive(ctx, masterIP) { logger.Info(fmt.Sprintf("getSentinelMasterAddr: %s, %s", masterIP, masterPort)) return masterIP, masterPort, nil } logger.Info("Master IP reported but pod not alive", "ip", masterIP) } else if err != nil { lastErr = err }

time.Sleep(2 * time.Second) }

return "", "", fmt.Errorf("failed to get valid master address after 5 attempts: %v", lastErr) }

func (r *RedisSentinelReconciler) isPodAlive(ctx context.Context, ip string) bool { pods := &corev1.PodList{} if err := r.List(ctx, pods); err != nil { return false }

for _, pod := range pods.Items { if pod.Status.PodIP == ip || strings.Contains(ip, pod.Spec.Hostname) { return pod.DeletionTimestamp == nil } } return false }

func (r *RedisSentinelReconciler) checkPodDeletion(ctx context.Context, rs *cachev1.RedisSentinel) bool { podList := &corev1.PodList{} if err := r.List(ctx, podList, client.MatchingLabels{ "app": "redis-sentinel", "name": rs.Name, "component": "redis", }); err != nil { return false }

for _, pod := range podList.Items { if pod.DeletionTimestamp != nil { return true } } return false }

func (r *RedisSentinelReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&cachev1.RedisSentinel{}). Owns(&appsv1.StatefulSet{}). Owns(&corev1.Service{}). Owns(&corev1.ConfigMap{}). Owns(&corev1.Endpoints{}). Complete(r) }

其中有一些点需要注意一下,比如我们通过给不同的 Pod 标记其此时是否是主节点,从而给对应的 Pod 打上对应的标记,从而保证 xxx-redis-master 服务能找到主节点。

2.2.3 准入控制

kubebuilder create webhook --group cache --version v1 --kind RedisSentinel --defaulting --programmatic-validation

通过以上命令给 RedisSentinel 创建准入控制,其相关设置如下,这里就不赘述了:

package v1

import ( "fmt"

"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" ctrl "sigs.k8s.io/controller-runtime" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook" )

const ( MinRedisSize = "1Gi" MaxRedisSize = "100Gi" MinSentinelSize = "100Mi" MaxSentinelSize = "1Gi" MinReplicas = 3 MaxReplicas = 7 MinQuorum = 2 )

var redissentinellog = logf.Log.WithName("redissentinel-resource")

func (r *RedisSentinel) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(r). Complete() }

var _ webhook.Defaulter = &RedisSentinel{}

func (r *RedisSentinel) Default() { redissentinellog.Info("default", "name", r.Name)

if r.Spec.Image == "" { r.Spec.Image = "redis:7.0" redissentinellog.Info("Setting default Redis image", "image", r.Spec.Image) }

if r.Spec.SentinelImage == "" { r.Spec.SentinelImage = "redis:7.0" redissentinellog.Info("Setting default Sentinel image", "image", r.Spec.SentinelImage) }

if r.Spec.MasterNodePort == 0 { r.Spec.MasterNodePort = 30999 redissentinellog.Info("Setting default Redis Master NodePort", "nodePort", r.Spec.MasterNodePort) }

if r.Spec.NodePort == 0 { r.Spec.NodePort = 31000 redissentinellog.Info("Setting default Redis NodePort", "nodePort", r.Spec.NodePort) }

if r.Spec.SentinelNodePort == 0 { r.Spec.SentinelNodePort = 31001 redissentinellog.Info("Setting default Sentinel NodePort", "sentinelNodePort", r.Spec.SentinelNodePort) }

if r.Spec.RedisReplicas == 0 { r.Spec.RedisReplicas = 3 redissentinellog.Info("Setting default Redis replicas", "redisReplicas", r.Spec.RedisReplicas) }

if r.Spec.SentinelReplicas == 0 { r.Spec.SentinelReplicas = 3 redissentinellog.Info("Setting default Sentinel replicas", "sentinelReplicas", r.Spec.SentinelReplicas) }

if r.Spec.Storage.HostPath == "" { r.Spec.Storage.HostPath = "/data" redissentinellog.Info("Setting default host path", "hostPath", r.Spec.Storage.HostPath) }

if r.Spec.Storage.Size.IsZero() { size := resource.MustParse("1Gi") r.Spec.Storage.Size = size redissentinellog.Info("Setting default Redis storage size", "size", size.String()) } }

var _ webhook.Validator = &RedisSentinel{}

func (r *RedisSentinel) ValidateCreate() error { redissentinellog.Info("validate create", "name", r.Name)

return r.validateRedisSentinel() }

func (r *RedisSentinel) ValidateUpdate(old runtime.Object) error { redissentinellog.Info("validate update", "name", r.Name)

oldSentinel, ok := old.(*RedisSentinel) if !ok { return fmt.Errorf("expected a RedisSentinel object but got %T", old) }

if err := r.validateRedisSentinel(); err != nil { return err }

if oldSentinel.Spec.Image != r.Spec.Image { return field.Forbidden( field.NewPath("spec", "image"), "Redis image cannot be changed after creation", ) }

if oldSentinel.Spec.SentinelImage != r.Spec.SentinelImage { return field.Forbidden( field.NewPath("spec", "sentinelImage"), "Sentinel image cannot be changed after creation", ) }

if oldSentinel.Spec.Storage.HostPath != r.Spec.Storage.HostPath { return field.Forbidden( field.NewPath("spec", "storage", "hostPath"), "hostPath cannot be changed after creation", ) }

return nil }

func (r *RedisSentinel) ValidateDelete() error { redissentinellog.Info("validate delete", "name", r.Name)

return nil }

func (r *RedisSentinel) validateRedisSentinel() error { allErrs := field.ErrorList{}

if err := validateStorageSize( r.Spec.Storage.Size, MinRedisSize, MaxRedisSize, field.NewPath("spec", "storage", "size"), "Redis"); err != nil { allErrs = append(allErrs, err) }

if r.Spec.SentinelReplicas < MinReplicas { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "sentinelReplicas"), r.Spec.SentinelReplicas, fmt.Sprintf("Sentinel replicas must be at least %d", MinReplicas), )) } else if r.Spec.SentinelReplicas > MaxReplicas { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "sentinelReplicas"), r.Spec.SentinelReplicas, fmt.Sprintf("Sentinel replicas must be no more than %d", MaxReplicas), )) }

if r.Spec.RedisReplicas < MinReplicas { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "redisReplicas"), r.Spec.RedisReplicas, fmt.Sprintf("Redis replicas must be at least %d", MinReplicas), )) } else if r.Spec.RedisReplicas > MaxReplicas { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "redisReplicas"), r.Spec.RedisReplicas, fmt.Sprintf("Redis replicas must be no more than %d", MaxReplicas), )) }

if r.Spec.SentinelReplicas < MinQuorum*2-1 { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "sentinelReplicas"), r.Spec.SentinelReplicas, fmt.Sprintf("Sentinel replicas must be at least %d for a quorum of %d", MinQuorum*2-1, MinQuorum), )) }

if r.Spec.MasterNodePort < 30000 || r.Spec.MasterNodePort > 32767 { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "nodePort"), r.Spec.MasterNodePort, "Redis master nodePort must be between 30000 and 32767", )) }

if r.Spec.NodePort < 30000 || r.Spec.NodePort > 32767 { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "nodePort"), r.Spec.NodePort, "Redis nodePort must be between 30000 and 32767", )) }

if r.Spec.SentinelNodePort < 30000 || r.Spec.SentinelNodePort > 32767 { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "sentinelNodePort"), r.Spec.SentinelNodePort, "Sentinel nodePort must be between 30000 and 32767", )) }

if !isValidHostPath(r.Spec.Storage.HostPath) { allErrs = append(allErrs, field.Invalid( field.NewPath("spec", "storage", "hostPath"), r.Spec.Storage.HostPath, "invalid host path, only /data directory is allowed", )) }

if len(allErrs) == 0 { return nil }

return allErrs.ToAggregate() }

func validateStorageSize(size resource.Quantity, min, max string, path *field.Path, resourceType string) *field.Error { minSize := resource.MustParse(min) maxSize := resource.MustParse(max)

if size.Cmp(minSize) < 0 { return field.Invalid( path, size.String(), fmt.Sprintf("%s storage size must be at least %s", resourceType, min), ) }

if size.Cmp(maxSize) > 0 { return field.Invalid( path, size.String(), fmt.Sprintf("%s storage size must be no more than %s", resourceType, max), ) }

return nil }

func validateSentinelQuorum(replicas int32) bool { return replicas >= MinQuorum && replicas%2 == 1 }

3. 验证

通过一系列命令将 CRD 发布之后,我们开始验证。首先我们验证基本流程:

主节点端口读写

$ redis-cli -h 127.0.0.1 -p 6378
127.0.0.1:6378> get key2
(nil)
127.0.0.1:6378> set key2 hello
OK

从节点端口读

$ redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> get key2
"hello"
127.0.0.1:6379> set key2 hello1
(error) READONLY You can't write against a read only replica.

可以发现,从节点只能读不能写;但是这也不是一定的,因为很有可能长连接连接到的是主节点。

sentinel端口

$ redis-cli -h 127.0.0.1 -p 26379
127.0.0.1:26379> SENTINEL master mymaster
 1) "name"
 2) "mymaster"
 3) "ip"
 4) "redissentinel-sample-redis-0.redissentinel-sample-redis-headless.default.svc.cluster.local"
 5) "port"
 6) "6379"
 7) "runid"
 8) "7792152f59bc4716a8d88a76cd39ed19c2bc0c92"
 9) "flags"
10) "master"
11) "link-pending-commands"
12) "0"
13) "link-refcount"
14) "1"
15) "last-ping-sent"
16) "0"
17) "last-ok-ping-reply"
18) "415"
19) "last-ping-reply"
20) "415"
21) "down-after-milliseconds"
22) "5000"
23) "info-refresh"
24) "7391"
25) "role-reported"
26) "master"
27) "role-reported-time"
28) "25018208"
29) "config-epoch"
30) "0"
31) "num-slaves"
32) "2"
33) "num-other-sentinels"
34) "2"
35) "quorum"
36) "2"
37) "failover-timeout"
38) "10000"
39) "parallel-syncs"
40) "1"
127.0.0.1:26379> SENTINEL slaves mymaster
1)  1) "name"
    2) "10.244.1.5:6379"
    3) "ip"
    4) "10.244.1.5"
    5) "port"
    6) "6379"
    7) "runid"
    8) "099f9411d941e3bfe6888870afd260e9b5eea60e"
    9) "flags"
   10) "slave"
   11) "link-pending-commands"
   12) "0"
   13) "link-refcount"
   14) "1"
   15) "last-ping-sent"
   16) "0"
   17) "last-ok-ping-reply"
   18) "247"
   19) "last-ping-reply"
   20) "247"
   21) "down-after-milliseconds"
   22) "5000"
   23) "info-refresh"
   24) "7115"
   25) "role-reported"
   26) "slave"
   27) "role-reported-time"
   28) "25029569"
   29) "master-link-down-time"
   30) "0"
   31) "master-link-status"
   32) "ok"
   33) "master-host"
   34) "redissentinel-sample-redis-0.redissentinel-sample-redis-headless.default.svc.cluster.local"
   35) "master-port"
   36) "6379"
   37) "slave-priority"
   38) "100"
   39) "slave-repl-offset"
   40) "3549038"
   41) "replica-announced"
   42) "1"
2)  1) "name"
    2) "10.244.3.2:6379"
    3) "ip"
    4) "10.244.3.2"
    5) "port"
    6) "6379"
    7) "runid"
    8) "ae038757a97446ccc7325812d929b7c1e7a3fa0f"
    9) "flags"
   10) "slave"
   11) "link-pending-commands"
   12) "0"
   13) "link-refcount"
   14) "1"
   15) "last-ping-sent"
   16) "0"
   17) "last-ok-ping-reply"
   18) "247"
   19) "last-ping-reply"
   20) "247"
   21) "down-after-milliseconds"
   22) "5000"
   23) "info-refresh"
   24) "7241"
   25) "role-reported"
   26) "slave"
   27) "role-reported-time"
   28) "25029572"
   29) "master-link-down-time"
   30) "0"
   31) "master-link-status"
   32) "ok"
   33) "master-host"
   34) "redissentinel-sample-redis-0.redissentinel-sample-redis-headless.default.svc.cluster.local"
   35) "master-port"
   36) "6379"
   37) "slave-priority"
   38) "100"
   39) "slave-repl-offset"
   40) "3549038"
   41) "replica-announced"
   42) "1"
127.0.0.1:26379> SENTINEL get-master-addr-by-name mymaster
1) "redissentinel-sample-redis-0.redissentinel-sample-redis-headless.default.svc.cluster.local"
2) "6379"

failover验证

我们发现此时的主节点是
redissentinel-sample-redis-0
,这时候我们删了这个节点:

$ k get pod --show-labels
NAME                              READY   STATUS    RESTARTS        AGE   LABELS
redissentinel-sample-redis-0      1/1     Running   2 (7h10m ago)   19d   app=redis-sentinel,component=redis,controller-revision-hash=redissentinel-sample-redis-9c894dbc9,name=redissentinel-sample,redis-role=master,statefulset.kubernetes.io/pod-name=redissentinel-sample-redis-0
redissentinel-sample-redis-1      1/1     Running   2 (7h10m ago)   19d   app=redis-sentinel,component=redis,controller-revision-hash=redissentinel-sample-redis-9c894dbc9,name=redissentinel-sample,redis-role=slave,statefulset.kubernetes.io/pod-name=redissentinel-sample-redis-1
redissentinel-sample-redis-2      1/1     Running   2 (7h10m ago)   19d   app=redis-sentinel,component=redis,controller-revision-hash=redissentinel-sample-redis-9c894dbc9,name=redissentinel-sample,redis-role=slave,statefulset.kubernetes.io/pod-name=redissentinel-sample-redis-2
$ k delete pod redissentinel-sample-redis-0
pod "redissentinel-sample-redis-0" deleted

此时我们回到主节点端口:

127.0.0.1:6378> get key2
"hello"
127.0.0.1:6378> set key2 hello1
OK
127.0.0.1:6378> get key2
"hello1"

可以看到,主节点端口依然可以进行读写操作,我们再去看从节点端口:

127.0.0.1:6379> get key2
"hello1"
127.0.0.1:6379> set key2 hello
(error) READONLY You can't write against a read only replica.

最后再去 sentinel 端口验证一下此时的主节点:

127.0.0.1:26379> SENTINEL master mymaster
 ...
 3) "ip"
 4) "10.244.1.5"
 ...

而这个节点是节点3:

$ k get pod -o wide
NAME                              READY   STATUS    RESTARTS        AGE     IP           NODE            NOMINATED NODE   READINESS GATES
redissentinel-sample-redis-0      1/1     Running   0               3m54s   10.244.2.6   multi-worker    <none>           <none>

redissentinel-sample-redis-1 1/1 Running 2 (7h21m ago) 19d 10.244.3.2 multi-worker2 <none> <none>

redissentinel-sample-redis-2 1/1 Running 2 (7h21m ago) 19d 10.244.1.5 multi-worker3 <none> <none>

但是这个方案还是有很大问题的,我在多次尝试后会发现:

  1. 后续 redis-cli 需要重连,因为这些链接是TCP的长连接;
  2. 如果发生了故障转移,可能需要一点时间才能将这个role转移过来,这点应该可以通过更优雅的代码实现,但是这里是做一个demo,我就不深究了,本质上是为了学习Operator的实现。

相关推荐

风险突出的高危端口汇总 一网打尽 !

高危端口一直是攻击者关注的焦点,了解这些端口的风险、攻击方式及防护策略至关重要。一、文件传输类端口1.TCP20/21:FTP服务端口FTP(文件传输协议)用于文件的上传和下载。其明文传输特性使得...

9. Redis Operator (2) —— Sentinel部署

0.简介上一篇,我们借由Redis的单机部署,学习了一下Operator的基本使用,今天,我们在此基础上,部署一下Redis的Sentinel模式。Sentinel本质上是为了解...

Spring Boot3 整合 Redis 后解决缓存穿透问题全解析

在当今互联网软件开发领域,构建高效、稳定的应用系统是每个开发者的追求。对于从事互联网软件开发的人员来说,SpringBoot和Redis都是极为常用的技术工具。当在SpringBoot3...

Spring Boot3 整合 Redis 后解决缓存雪崩问题全解析

在当今互联网软件开发领域,高并发、高性能的系统需求日益增长。对于从事互联网软件开发的人员来说,构建高效的缓存机制至关重要。SpringBoot3作为一款流行的Java框架,与Redis这一...

Sa-Token 多账号体系下Redis持久化问题

在使用Sa-Token框架实现多账号体系时,当后端服务重启后,系统报错"未能获取对应StpLogic,type=XXX"。这种情况通常发生在配置了Redis持久化存储的场景下...

外贸独立站缓存迷惑行为:你的Redis可能正在制造更多问题!

上周帮一个深圳卖家排查网站卡顿,发现他们用Redis缓存了整站HTML——"你们这是把缓存当备份用呢?"结果每次更新产品都要手动清空缓存,编辑小哥差点辞职...最近对象缓存圈两大魔教...

别再用top和htop了,这几款终端神器让你的服务器状态一目了然

当top命令成为性能瓶颈:一个深夜运维的真实困境凌晨三点,服务器告警短信突然炸响。老王盯着屏幕上top命令的黑白界面,CPU使用率飙升到90%却找不到具体进程,内存占用数据分散在不同列,磁盘I/O更是...

Redis学习笔记:管道(Pipelining)技术详解(第三章)

在掌握了Redis的基础命令后,如何进一步提升批量操作的效率?管道(Pipelining)技术是解决这一问题的关键。本章将深入解析管道的工作原理、使用场景及与其他技术的对比,帮助你在高并发场景下优化R...

Redis8.0有哪些新特性(redis最新特性)

Redis8.0引入了多项新特性和功能增强,以下是其中的一些亮点:1、数据结构:向量集合(VectorSet):这是一种新的数据类型,专为向量相似性搜索设计。它基于有序集(sortedset)...

Netty 的对象池(netty objectdecoder)

Netty是一个高性能的网络通信框架,广泛用于构建高并发、低延迟的TCP/UDP服务。为了提升性能,Netty内部大量使用了对象池(ObjectPool)技术来减少频繁创建和销毁对象带来的...

Redis学习笔记:核心命令与数据类型操作指南(第二章)

上一章我们梳理了Redis的核心应用场景与选型逻辑,本章将聚焦Redis的命令体系,从键操作到各数据类型的核心命令,帮你快速掌握Redis的"操作语法"。一、键(Key)命令:Redi...

Redis面试核心考点总结(覆盖 90% 的 Redis 面试场景)

一、基础核心数据类型与适用场景String:缓存、计数器(INCR)、分布式锁(SETNX)Hash:存储对象(用户信息、商品属性)List:消息队列(LPUSH/BRPOP)、时间线Set:标...

Redis ListPack有哪些具体应用场景?

Redis的Listpack是一种紧凑的数据结构,适用于存储少量数据。它被设计为ziplist的一种改进版本,旨在解决ziplist中存在的连锁更新问题,并提供更高效的内存使用和访问速度。以下是Lis...

SpringBoot实现单点登录(SSO)的4种方案

单点登录(SingleSign-On,SSO)是企业应用系统中常见的用户认证方案,它允许用户使用一组凭证访问多个相关但独立的系统,无需重复登录。对于拥有多个应用的企业来说,SSO可以显著提升用户体验...

刚刚,给学妹普及了登录的两大绝学

今天跟大家聊一个比较基础的话题,就是实现登录的方式有哪些?适合刚入行的朋友。华山之Session绝学Session我们称之为会话控制,是一种在服务器端保持会话状态的解决方案。通俗点来讲就是客户...

取消回复欢迎 发表评论: