Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize some code logic #202

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions cmd/carina-node/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,32 @@ package run
import (
"context"
"errors"
"github.com/carina-io/carina"
"github.com/carina-io/carina/runners"
"os"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"os"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"time"

carinav1beta1 "github.com/carina-io/carina/api/v1beta1"

carinav1 "github.com/carina-io/carina/api/v1"
carinav1beta1 "github.com/carina-io/carina/api/v1beta1"
"github.com/carina-io/carina/controllers"
"github.com/carina-io/carina/pkg/csidriver/driver"
"github.com/carina-io/carina/pkg/csidriver/driver/k8s"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
carinaMetrics "github.com/carina-io/carina/pkg/metrics"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"github.com/carina-io/carina/runners"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -70,15 +72,29 @@ func subMain() error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: config.metricsAddr,
LeaderElection: false,
NewCache: cache.BuilderWithOptions(cache.Options{
Scheme: scheme,
SelectorsByObject: cache.SelectorsByObject{
&corev1.Node{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName}),
},
&corev1.Pod{}: {
Field: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}),
},
&carinav1beta1.NodeStorageResource{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName}),
},
},
}),
LeaderElection: false,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
return err
}

// 初始化磁盘管理服务
dm := deviceManager.NewDeviceManager(nodeName, mgr.GetCache(), mgr.GetClient())
dm := deviceManager.NewDeviceManager(nodeName, mgr.GetCache())

// pod io controller
podIOController := controllers.NewPodIOReconciler(
Expand Down
44 changes: 20 additions & 24 deletions pkg/csidriver/filesystem/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"

"github.com/carina-io/carina/utils/log"

"golang.org/x/sys/unix"
"k8s.io/utils/io"

"github.com/carina-io/carina/utils/log"
)

const (
Expand All @@ -44,9 +44,16 @@ func isSameDevice(dev1, dev2 string) (bool, error) {

var st1, st2 unix.Stat_t
if err := Stat(dev1, &st1); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("stat failed for %s: %v", dev1, err)
}

if err := Stat(dev2, &st2); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("stat failed for %s: %v", dev2, err)
}

Expand All @@ -60,12 +67,13 @@ func IsMounted(device, target string) (bool, error) {
if err != nil {
return false, err
}

target, err = filepath.EvalSymlinks(abs)
if err != nil {
return false, err
}

data, err := os.ReadFile("/proc/mounts")
data, err := io.ConsistentRead("/proc/mounts", 3)
if err != nil {
return false, fmt.Errorf("could not read /proc/mounts: %v", err)
}
Expand All @@ -75,41 +83,29 @@ func IsMounted(device, target string) (bool, error) {
if len(fields) < 2 {
continue
}
//Intercept characters to determine that they belong to the same pod
podstr, err := getOneStringByRegex(fields[1], `/pods/([\w-]+)/`)

// If the filesystem is nfs(cephfs、ussfs etc) and its connection is broken, EvalSymlinks will be stuck.
// So it should be in before calling EvalSymlinks.
ok, err := isSameDevice(device, fields[0])
if err != nil {
return false, fmt.Errorf("could not read pods mountpath %s : %v", fields[1], err)
return false, err
}
if !strings.Contains(target, podstr) {
if !ok {
continue
}

d, err := filepath.EvalSymlinks(fields[1])
if err != nil {
return false, err
}
if d == target {
return isSameDevice(device, fields[0])
return true, nil
}
}

return false, nil
}

func getOneStringByRegex(str, rule string) (string, error) {
if !strings.Contains(str, "/pods/") {
return "non-csi", nil
}
reg, err := regexp.Compile(rule)
if reg == nil || err != nil {
return "", fmt.Errorf("regexp compile:" + err.Error())
}
result := reg.FindStringSubmatch(str)
if len(result) < 1 {
return "", fmt.Errorf("could not find sub str: %v", str)
}
return result[1], nil
}

// DetectFilesystem returns filesystem type if device has a filesystem.
// This returns an empty string if no filesystem exists.
func DetectFilesystem(device string) (string, error) {
Expand Down
13 changes: 6 additions & 7 deletions pkg/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ package deviceManager

import (
"context"
"github.com/carina-io/carina/pkg/devicemanager/bcache"
"time"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/carina-io/carina/pkg/configuration"
"github.com/carina-io/carina/pkg/devicemanager/bcache"
"github.com/carina-io/carina/pkg/devicemanager/lvmd"
"github.com/carina-io/carina/pkg/devicemanager/partition"
"github.com/carina-io/carina/pkg/devicemanager/volume"
"github.com/carina-io/carina/utils/exec"
"github.com/carina-io/carina/utils/log"
"github.com/carina-io/carina/utils/mutx"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Trigger string
Expand All @@ -51,7 +52,6 @@ type VolumeEvent struct {

type DeviceManager struct {
Cache cache.Cache
client.Client
// Volume 操作
VolumeManager volume.LocalVolume
//磁盘以及分区操作
Expand All @@ -60,13 +60,12 @@ type DeviceManager struct {
noticeUpdates []chan *VolumeEvent
}

func NewDeviceManager(nodeName string, cache cache.Cache, client client.Client) *DeviceManager {
func NewDeviceManager(nodeName string, cache cache.Cache) *DeviceManager {
executor := &exec.CommandExecutor{}
mutex := mutx.NewGlobalLocks()

dm := DeviceManager{
Cache: cache,
Client: client,
VolumeManager: &volume.LocalVolumeImplement{Mutex: mutex, Lv: &lvmd.Lvm2Implement{Executor: executor}, Bcache: &bcache.BcacheImplement{Executor: executor}},
Partition: &partition.LocalPartitionImplement{Mutex: mutex, CacheParttionNum: make(map[string]uint), Executor: executor},
NodeName: nodeName,
Expand Down
Loading