Skip to content

Commit

Permalink
Add versioning to config, and support for data.car.from_pieces.piece_…
Browse files Browse the repository at this point in the history
…to_uri

Closes #80, #79
  • Loading branch information
gagliardetto committed Jan 28, 2024
1 parent b30e35f commit 056fe16
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cmd-check-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newCmd_check_deals() *cli.Command {
epoch := *config.Epoch
isLassieMode := config.IsFilecoinMode()
isCarMode := !isLassieMode
if isCarMode && config.IsSplitCarMode() {
if isCarMode && config.IsCarFromPieces() {
klog.Infof("Checking pieces for epoch %d from %q", epoch, config.ConfigFilepath())

metadata, err := splitcarfetcher.MetadataFromYaml(string(config.Data.Car.FromPieces.Metadata.URI))
Expand Down
51 changes: 45 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

const ConfigVersion = 1

type URI string

// String() returns the URI as a string.
func (u URI) String() string {
return string(u)
}

// IsZero returns true if the URI is empty.
func (u URI) IsZero() bool {
return u == ""
Expand Down Expand Up @@ -93,10 +100,15 @@ func hashFileSha256(filePath string) (string, error) {
return fmt.Sprintf("%x", h.Sum(nil)), nil
}

type PieceURLInfo struct {
URI URI `json:"uri" yaml:"uri"` // URL to the piece.
}

type Config struct {
originalFilepath string
hashOfConfigFile string
Epoch *uint64 `json:"epoch" yaml:"epoch"`
Version *uint64 `json:"version" yaml:"version"`
Data struct {
Car *struct {
URI URI `json:"uri" yaml:"uri"`
Expand All @@ -107,6 +119,7 @@ type Config struct {
Deals struct {
URI URI `json:"uri" yaml:"uri"` // Local path to the deals file.
} `json:"deals" yaml:"deals"`
PieceToURI map[cid.Cid]PieceURLInfo `json:"piece_to_uri" yaml:"piece_to_uri"` // Map of piece CID to URL.
} `json:"from_pieces" yaml:"from_pieces"`
} `json:"car" yaml:"car"`
Filecoin *struct {
Expand Down Expand Up @@ -173,8 +186,12 @@ func (c *Config) IsFilecoinMode() bool {
return c.Data.Filecoin != nil && c.Data.Filecoin.Enable
}

func (c *Config) IsSplitCarMode() bool {
return c.Data.Car != nil && c.Data.Car.FromPieces != nil && !c.Data.Car.FromPieces.Metadata.URI.IsZero() && !c.Data.Car.FromPieces.Deals.URI.IsZero()
func (c *Config) IsCarFromPieces() bool {
if c.Data.Car == nil || c.Data.Car.FromPieces == nil {
return false
}
fromPieces := c.Data.Car.FromPieces
return !fromPieces.Metadata.URI.IsZero() && (!fromPieces.Deals.URI.IsZero() || len(fromPieces.PieceToURI) > 0)
}

type ConfigSlice []*Config
Expand Down Expand Up @@ -223,6 +240,12 @@ func (c *Config) Validate() error {
if c.Epoch == nil {
return fmt.Errorf("epoch must be set")
}
if c.Version == nil {
return fmt.Errorf("version must be set")
}
if *c.Version != ConfigVersion {
return fmt.Errorf("version must be %d", ConfigVersion)
}
// Distinguish between CAR-mode and Filecoin-mode.
// In CAR-mode, the data is fetched from a CAR file (local or remote).
// In Filecoin-mode, the data is fetched from Filecoin directly (by CID via Lassie).
Expand Down Expand Up @@ -254,12 +277,28 @@ func (c *Config) Validate() error {
}
}
{
if c.Data.Car.FromPieces.Deals.URI.IsZero() {
return fmt.Errorf("data.car.from_pieces.deals.uri must be set")
if c.Data.Car.FromPieces.Deals.URI.IsZero() && len(c.Data.Car.FromPieces.PieceToURI) == 0 {
return fmt.Errorf("data.car.from_pieces.deals.uri or data.car.from_pieces.piece_to_uri must be set")
}
if !c.Data.Car.FromPieces.Deals.URI.IsLocal() {
if !c.Data.Car.FromPieces.Deals.URI.IsZero() && len(c.Data.Car.FromPieces.PieceToURI) > 0 {
return fmt.Errorf("data.car.from_pieces.deals.uri and data.car.from_pieces.piece_to_uri cannot both be set")
}
if !c.Data.Car.FromPieces.Deals.URI.IsZero() && !c.Data.Car.FromPieces.Deals.URI.IsLocal() {
return fmt.Errorf("data.car.from_pieces.deals.uri must be a local file")
}
if len(c.Data.Car.FromPieces.PieceToURI) > 0 {
for pieceCID, uri := range c.Data.Car.FromPieces.PieceToURI {
if !pieceCID.Defined() {
return fmt.Errorf("data.car.from_pieces.piece_to_uri[%s] must be a valid CID", pieceCID)
}
if uri.URI.IsZero() {
return fmt.Errorf("data.car.from_pieces.piece_to_uri[%s].uri must be set", pieceCID)
}
if !uri.URI.IsRemoteWeb() {
return fmt.Errorf("data.car.from_pieces.piece_to_uri[%s].uri must be a remote web URI", pieceCID)
}
}
}
}
}
// CidToOffsetAndSize and CidToOffset cannot be both set or both unset.
Expand Down Expand Up @@ -340,7 +379,7 @@ func (c *Config) Validate() error {
if !c.Data.Car.FromPieces.Metadata.URI.IsValid() {
return fmt.Errorf("data.car.from_pieces.metadata.uri is invalid")
}
if !c.Data.Car.FromPieces.Deals.URI.IsValid() {
if !c.Data.Car.FromPieces.Deals.URI.IsZero() && !c.Data.Car.FromPieces.Deals.URI.IsValid() {
return fmt.Errorf("data.car.from_pieces.deals.uri is invalid")
}
} else {
Expand Down
144 changes: 83 additions & 61 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,76 +283,95 @@ func NewEpochFromConfig(
var localCarReader *carv2.Reader
var remoteCarReader ReaderAtCloser
var err error
if config.IsSplitCarMode() {
if config.IsCarFromPieces() {

metadata, err := splitcarfetcher.MetadataFromYaml(string(config.Data.Car.FromPieces.Metadata.URI))
if err != nil {
return nil, fmt.Errorf("failed to read pieces metadata: %w", err)
}

dealRegistry, err := splitcarfetcher.DealsFromCSV(string(config.Data.Car.FromPieces.Deals.URI))
if err != nil {
return nil, fmt.Errorf("failed to read deals: %w", err)
}
isFromDeals := !config.Data.Car.FromPieces.Deals.URI.IsZero()

lotusAPIAddress := "https://api.node.glif.io"
cl := jsonrpc.NewClient(lotusAPIAddress)
dm := splitcarfetcher.NewMinerInfo(
cl,
5*time.Minute,
5*time.Second,
)
if isFromDeals {
dealRegistry, err := splitcarfetcher.DealsFromCSV(string(config.Data.Car.FromPieces.Deals.URI))
if err != nil {
return nil, fmt.Errorf("failed to read deals: %w", err)
}

scr, err := splitcarfetcher.NewSplitCarReader(metadata.CarPieces,
func(piece carlet.CarFile) (splitcarfetcher.ReaderAtCloserSize, error) {
minerID, ok := dealRegistry.GetMinerByPieceCID(piece.CommP)
if !ok {
return nil, fmt.Errorf("failed to find miner for piece CID %s", piece.CommP)
}
klog.Infof("piece CID %s is stored on miner %s", piece.CommP, minerID)
minerInfo, err := dm.GetProviderInfo(c.Context, minerID)
if err != nil {
return nil, fmt.Errorf("failed to get miner info for miner %s, for piece %s: %w", minerID, piece.CommP, err)
}
if len(minerInfo.Multiaddrs) == 0 {
return nil, fmt.Errorf("miner %s has no multiaddrs", minerID)
}
spew.Dump(minerInfo)
// extract the IP address from the multiaddr:
split := multiaddr.Split(minerInfo.Multiaddrs[0])
if len(split) < 2 {
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
}
component0 := split[0].(*multiaddr.Component)
component1 := split[1].(*multiaddr.Component)

var ip string
var port string

if component0.Protocol().Code == multiaddr.P_IP4 {
ip = component0.Value()
port = component1.Value()
} else if component1.Protocol().Code == multiaddr.P_IP4 {
ip = component1.Value()
port = component0.Value()
} else {
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
}
// reset the port to 80:
// TODO: use the appropriate port (80, better if 443 with TLS)
port = "80"
minerIP := fmt.Sprintf("%s:%s", ip, port)
klog.Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP)
formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String())
return splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
formattedURL,
)
})
if err != nil {
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
lotusAPIAddress := "https://api.node.glif.io"
cl := jsonrpc.NewClient(lotusAPIAddress)
dm := splitcarfetcher.NewMinerInfo(
cl,
5*time.Minute,
5*time.Second,
)

scr, err := splitcarfetcher.NewSplitCarReader(
metadata.CarPieces,
func(piece carlet.CarFile) (splitcarfetcher.ReaderAtCloserSize, error) {
minerID, ok := dealRegistry.GetMinerByPieceCID(piece.CommP)
if !ok {
return nil, fmt.Errorf("failed to find miner for piece CID %s", piece.CommP)
}
klog.Infof("piece CID %s is stored on miner %s", piece.CommP, minerID)
minerInfo, err := dm.GetProviderInfo(c.Context, minerID)
if err != nil {
return nil, fmt.Errorf("failed to get miner info for miner %s, for piece %s: %w", minerID, piece.CommP, err)
}
if len(minerInfo.Multiaddrs) == 0 {
return nil, fmt.Errorf("miner %s has no multiaddrs", minerID)
}
spew.Dump(minerInfo)
// extract the IP address from the multiaddr:
split := multiaddr.Split(minerInfo.Multiaddrs[0])
if len(split) < 2 {
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
}
component0 := split[0].(*multiaddr.Component)
component1 := split[1].(*multiaddr.Component)

var ip string
// TODO: use the appropriate port (80, better if 443 with TLS)
port := "80"

if component0.Protocol().Code == multiaddr.P_IP4 {
ip = component0.Value()
} else if component1.Protocol().Code == multiaddr.P_IP4 {
ip = component1.Value()
} else {
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
}
minerIP := fmt.Sprintf("%s:%s", ip, port)
klog.Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP)
formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String())
return splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
formattedURL,
)
})
if err != nil {
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
}
remoteCarReader = scr
} else {
// is from pieceToURL mapping:
scrFromURLs, err := splitcarfetcher.NewSplitCarReader(
metadata.CarPieces,
func(piece carlet.CarFile) (splitcarfetcher.ReaderAtCloserSize, error) {
pieceURL, ok := config.Data.Car.FromPieces.PieceToURI[piece.CommP]
if !ok {
return nil, fmt.Errorf("failed to find URL for piece CID %s", piece.CommP)
}
return splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
pieceURL.URI.String(),
)
})
if err != nil {
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
}
remoteCarReader = scrFromURLs
}
remoteCarReader = scr
} else {
localCarReader, remoteCarReader, err = openCarStorage(c.Context, string(config.Data.Car.URI))
if err != nil {
Expand Down Expand Up @@ -397,6 +416,9 @@ func NewEpochFromConfig(
headerSize := uint64(buf.Len())
ep.carHeaderSize = headerSize
}
if remoteCarReader == nil && localCarReader == nil {
return nil, fmt.Errorf("no CAR reader available")
}
}
{
sigExistsFile, err := openIndexStorage(
Expand Down

0 comments on commit 056fe16

Please sign in to comment.