Skip to content

Commit

Permalink
Implemented storage client
Browse files Browse the repository at this point in the history
Added client to interact with name/data nodes
  • Loading branch information
notzree committed Jan 26, 2025
1 parent 8157a4e commit aad8bcd
Show file tree
Hide file tree
Showing 9 changed files with 611 additions and 218 deletions.
194 changes: 194 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package main

import (
"context"
"fmt"
"io"
"log"
"os"
"sync"
"time"

"github.com/notzree/richardstore/proto"
"google.golang.org/protobuf/types/known/emptypb"
)

type Client struct {
NameNodeAddr string
NameNodeClient *NameNodeClient
mu *sync.RWMutex
DataNodeMap map[string]*DataNodeClient
storer *Store
}

func NewClient(nameNodeAddr string, storer *Store) (*Client, error) {
nameNodeClient, err := NewNameNodeClient(nameNodeAddr)
if err != nil {
return nil, err
}
client := &Client{
NameNodeAddr: nameNodeAddr,
mu: &sync.RWMutex{},
DataNodeMap: make(map[string]*DataNodeClient),
storer: storer,
}
client.NameNodeClient = nameNodeClient
ctx := context.Background()
ctxWithTimeout, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
info, err := client.NameNodeClient.client.Info(ctxWithTimeout, &emptypb.Empty{})
if err != nil {
return nil, err
}

for _, peerInfo := range info.DataNodes {
peerClient, err := NewDataNodeClient(peerInfo.Address)
if err != nil {
return nil, err
}
client.DataNodeMap[peerInfo.Address] = peerClient
}
return client, nil
}

func (c *Client) Close() error {
for _, peerClient := range c.DataNodeMap {
peerClient.Close()
}
c.NameNodeClient.Close()
return nil
}

type writeResult struct {
nodeIdx int
success bool
err error
}

func (c *Client) _write(file *os.File) (string, error) {
fileInfo, err := file.Stat()
if err != nil {
return "", fmt.Errorf("failed to get file info: %w", err)
}
size := uint64(fileInfo.Size())

ctx := context.Background()
ctxWithTimeout, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

resp, err := c.NameNodeClient.client.CreateFile(ctxWithTimeout, &proto.CreateFileRequest{
Size: size,
MinReplicationFactor: 0.5,
})
log.Printf("data nodes: %s", resp.DataNodes)
if err != nil {
return "", err
}

expectedFileAddr, err := c.storer.CreateAddress(file)
if err != nil {
return "", err
}

_, err = file.Seek(0, io.SeekStart)
if err != nil {
return "", err
}

resultCh := make(chan writeResult, len(resp.DataNodes))
var wg sync.WaitGroup

for i, targetNode := range resp.DataNodes {
wg.Add(1)
go func(idx int, addr string) {
defer wg.Done()

c.mu.RLock()
peerClient, ok := c.DataNodeMap[addr]
c.mu.RUnlock()

if !ok {
resultCh <- writeResult{nodeIdx: idx, success: false, err: fmt.Errorf("node not found: %s", addr)}
return
}

stream, err := peerClient.client.WriteFile(ctxWithTimeout)
if err != nil {
resultCh <- writeResult{nodeIdx: idx, success: false, err: fmt.Errorf("failed to create stream: %w", err)}
return
}

err = stream.Send(&proto.WriteFileRequest{
Request: &proto.WriteFileRequest_FileInfo{
FileInfo: &proto.FileInfo{
Hash: expectedFileAddr.HashStr,
Size: size,
},
},
})
if err != nil {
resultCh <- writeResult{nodeIdx: idx, success: false, err: fmt.Errorf("failed to send file info: %w", err)}
return
}
log.Printf("sending first req")
buf := make([]byte, 1024)
for {
n, err := file.Read(buf)
if err == io.EOF {
break
}
if err != nil {
resultCh <- writeResult{nodeIdx: idx, success: false, err: fmt.Errorf("error reading file: %w", err)}
return
}

err = stream.Send(&proto.WriteFileRequest{
Request: &proto.WriteFileRequest_Chunk{
Chunk: buf[:n],
},
})
if err != nil {
resultCh <- writeResult{nodeIdx: idx, success: false, err: fmt.Errorf("failed to send chunk: %w", err)}
return
}
}

resp, err := stream.CloseAndRecv()
if err != nil {
resultCh <- writeResult{nodeIdx: idx, success: false, err: fmt.Errorf("error receiving response: %w", err)}
return
}

resultCh <- writeResult{nodeIdx: idx, success: resp.Success, err: nil}
}(i, targetNode.Address)
}

// Close result channel once all goroutines complete
go func() {
wg.Wait()
close(resultCh)
}()

// Collect results and check for failures
var errors []error
successCount := 0
for result := range resultCh {
if result.err != nil {
errors = append(errors, fmt.Errorf("node %d: %w", result.nodeIdx, result.err))
} else if result.success {
successCount++
}
}

if len(errors) > 0 {
return "", fmt.Errorf("write failed on some nodes: %v", errors)
}

if successCount == 0 {
return "", fmt.Errorf("write failed on all nodes")
}

return expectedFileAddr.HashStr, nil
}

//TODO: IMPLEMENT THE INFO RPC METHOD FOR THE NAME NODE.
64 changes: 64 additions & 0 deletions data-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,70 @@ func (node *DataNode) ReplicateFile(stream grpc.ClientStreamingServer[proto.Repl
}

func (node *DataNode) WriteFile(stream grpc.ClientStreamingServer[proto.WriteFileRequest, proto.WriteFileResponse]) error {
firstMsg, err := stream.Recv()
if err != nil {
return fmt.Errorf("failed to receive initial command: %w", err)
}
resp, ok := firstMsg.Request.(*proto.WriteFileRequest_FileInfo)
if !ok {
return fmt.Errorf("first message must be a command")
}

expectedFileHash := resp.FileInfo.Hash

// Create a pipe to connect the stream to an io.Reader
pr, pw := io.Pipe()

// Start a goroutine to read from the stream and write to the pipe
go func() {
defer pw.Close() // Make sure we close the writer when done

for {
msg, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
pw.CloseWithError(fmt.Errorf("error receiving chunk: %w", err))
return
}

chunk, ok := msg.Request.(*proto.WriteFileRequest_Chunk)
if !ok {
pw.CloseWithError(fmt.Errorf("expected chunk message"))
return
}

_, err = pw.Write(chunk.Chunk)
if err != nil {
pw.CloseWithError(fmt.Errorf("error writing to pipe: %w", err))
return
}
}
}()

// Pass the reader end of the pipe to Write()
hash, err := node.Storer.Write(pr)
if err != nil {
return fmt.Errorf("failed to write file: %w", err)
}
if hash != expectedFileHash {
return fmt.Errorf("file hash mismatch expecte %s got %s", expectedFileHash, hash)
}

// TODO: Do we need to replicate from the datanode? or client alr does parallel stream
// if len(cmd.TargetNodes) > 0 {
// node.cmdChan <- &proto.Command{
// Command: &proto.Command_Replicate{
// Replicate: cmd,
// },
// }
// }
log.Printf("File replication completed for node %d", node.Id)
stream.SendAndClose(&proto.WriteFileResponse{
Success: true,
})

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/notzree/richardstore
go 1.23

require (
github.com/golang/protobuf v1.5.4
github.com/stretchr/testify v1.9.0
golang.org/x/sys v0.26.0
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.0
)
Expand All @@ -13,7 +13,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
55 changes: 33 additions & 22 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package main

import (
"os"
"strconv"
"strings"
"time"

"github.com/notzree/richardstore/proto"
)

func main() {
Expand Down Expand Up @@ -41,7 +39,14 @@ func main() {

nameNode.AddDataNodes(peerDataNodes)
go nameNode.Run()

//todo: remove the storer from the client or decompose storer into hasher + storer
client, err := NewClient(":3009", NewStore(StoreOpts{
blockSize: 5,
root: ":3009",
}))
if err != nil {
panic(err)
}
for i := 1; i < (NUMNODES + 1); i++ {
dataNodes[i].NameNode = *nameNode.PeerRepresentation()

Expand All @@ -56,27 +61,33 @@ func main() {
go dataNodes[i].Run()
}

testData := "momsbestpicture"
hash, err := dataNodes[1].Storer.Write(strings.NewReader(testData))
// testData := "momsbestpicture"
// hash, err := dataNodes[1].Storer.Write(strings.NewReader(testData))
// if err != nil {
// panic(err)
// }

// mockReplicationCmd := &proto.Command{
// Command: &proto.Command_Replicate{
// Replicate: &proto.ReplicateCommand{
// FileInfo: &proto.FileInfo{
// Hash: hash,
// Size: 0,
// GenerationStamp: uint64(time.Now().UnixNano()),
// },
// TargetNodes: []uint64{2, 3},
// },
// },
// }

time.Sleep(time.Second * 1)
// nameNode.Commands[1] = append(nameNode.Commands[1], mockReplicationCmd)
// dataNodes[1].SendHeartbeat()
file, err := os.Open("test_file")
if err != nil {
panic(err)
}

mockReplicationCmd := &proto.Command{
Command: &proto.Command_Replicate{
Replicate: &proto.ReplicateCommand{
FileInfo: &proto.FileInfo{
Hash: hash,
Size: 0,
GenerationStamp: uint64(time.Now().UnixNano()),
},
TargetNodes: []uint64{2, 3},
},
},
}
time.Sleep(time.Second * 1)
nameNode.Commands[1] = append(nameNode.Commands[1], mockReplicationCmd)
dataNodes[1].SendHeartbeat()
client._write(file)
time.Sleep(time.Second * 100)

}
20 changes: 20 additions & 0 deletions name-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/notzree/richardstore/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)

type FileEntry struct {
Expand Down Expand Up @@ -376,3 +377,22 @@ func (node *NameNode) getCommands(id uint64) (cmds []*proto.Command, err error)
return commands, nil

}

func (node *NameNode) Info(ctx context.Context, _ *emptypb.Empty) (*proto.NameNodeInfo, error) {
node.dnMu.RLock()
defer node.dnMu.RUnlock()

dataNodes := make([]*proto.DataNodeInfo, 0, len(node.DataNodes))
for _, dn := range node.DataNodes {
if !dn.Alive {
continue
}
dataNodes = append(dataNodes, &proto.DataNodeInfo{
Address: dn.Address,
})
}

return &proto.NameNodeInfo{
DataNodes: dataNodes,
}, nil
}
Loading

0 comments on commit aad8bcd

Please sign in to comment.