Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(remote/pinning): Ls to take results channel instead of returning one #738

Merged
merged 5 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The following emojis are used to highlight certain changes:
- `routing/http/client`: creating delegated routing client with `New` now defaults to querying delegated routing server with `DefaultProtocolFilter` ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#689](https://github.com/ipfs/boxo/pull/689)
- `bitswap/client`: Wait at lease one broadcast interval before resending wants to a peer. Check for peers to rebroadcast to more often than one broadcast interval.
- No longer using `github.com/jbenet/goprocess` to avoid requiring in dependents. [#710](https://github.com/ipfs/boxo/pull/710)
- Refactor remote pinning `Ls` to take results channel instead of returning one. The previous `Ls` behavior is implemented by the GoLs function, which creates the channels, starts the goroutine that calls Ls, and returns the channels to the caller
lidel marked this conversation as resolved.
Show resolved Hide resolved

### Removed

Expand Down
130 changes: 72 additions & 58 deletions pinning/remote/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,77 +137,90 @@

type pinResults = openapi.PinResults

func (c *Client) Ls(ctx context.Context, opts ...LsOption) (chan PinStatusGetter, chan error) {
res := make(chan PinStatusGetter, 1)
errs := make(chan error, 1)

// Ls writes pin statuses to the PinStatusGetter channel. The channel is
// closed when there are no more pins. If an error occurs or ctx is canceled,
// then the channel is closed and an error is returned.
//
// Example:
//
// res := make(chan PinStatusGetter, 1)
// lsErr := make(chan error, 1)
// go func() {
// lsErr <- c.Ls(ctx, res, opts...)
// }()
// for r := range res {
// processPin(r)
// }
// return <-lsErr
func (c *Client) Ls(ctx context.Context, res chan<- PinStatusGetter, opts ...LsOption) (err error) {

Check warning on line 155 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L155

Added line #L155 was not covered by tests
settings := new(lsSettings)
for _, o := range opts {
if err := o(settings); err != nil {
if err = o(settings); err != nil {

Check warning on line 158 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L158

Added line #L158 was not covered by tests
close(res)
errs <- err
close(errs)
return res, errs
return err

Check warning on line 160 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L160

Added line #L160 was not covered by tests
}
}

go func() {
defer func() {
if r := recover(); r != nil {
var err error
switch x := r.(type) {
case string:
err = fmt.Errorf("unexpected error while listing remote pins: %s", x)
case error:
err = fmt.Errorf("unexpected error while listing remote pins: %w", x)
default:
err = errors.New("unknown panic while listing remote pins")
}
errs <- err
}
close(errs)
close(res)
}()

for {
pinRes, err := c.lsInternal(ctx, settings)
if err != nil {
errs <- err
return
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case string:
err = fmt.Errorf("unexpected error while listing remote pins: %s", x)
case error:
err = fmt.Errorf("unexpected error while listing remote pins: %w", x)
default:
err = errors.New("unknown panic while listing remote pins")

Check warning on line 172 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L164-L172

Added lines #L164 - L172 were not covered by tests
}
}
close(res)

Check warning on line 175 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L175

Added line #L175 was not covered by tests
}()

results := pinRes.GetResults()
for _, r := range results {
select {
case res <- &pinStatusObject{r}:
case <-ctx.Done():
errs <- ctx.Err()
return
}
}
for {
pinRes, err := c.lsInternal(ctx, settings)
if err != nil {
return err
}

Check warning on line 182 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L178-L182

Added lines #L178 - L182 were not covered by tests

batchSize := len(results)
if int(pinRes.Count) == batchSize {
// no more batches
return
results := pinRes.GetResults()
for _, r := range results {
select {
case res <- &pinStatusObject{r}:
case <-ctx.Done():
return ctx.Err()

Check warning on line 189 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L184-L189

Added lines #L184 - L189 were not covered by tests
}
}

// Better DX/UX for cases like https://github.com/application-research/estuary/issues/124
if batchSize == 0 && int(pinRes.Count) != 0 {
errs <- fmt.Errorf("invalid pinning service response: PinResults.count=%d but no PinResults.results", int(pinRes.Count))
return
}
batchSize := len(results)
if int(pinRes.Count) == batchSize {
// no more batches
return nil
}

Check warning on line 197 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L193-L197

Added lines #L193 - L197 were not covered by tests

oldestResult := results[batchSize-1]
settings.before = &oldestResult.Created
// Better DX/UX for cases like https://github.com/application-research/estuary/issues/124
if batchSize == 0 && int(pinRes.Count) != 0 {
return fmt.Errorf("invalid pinning service response: PinResults.count=%d but no PinResults.results", int(pinRes.Count))

Check warning on line 201 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}

oldestResult := results[batchSize-1]
settings.before = &oldestResult.Created

Check warning on line 205 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L204-L205

Added lines #L204 - L205 were not covered by tests
}
}

// GoLs creates the results and error channels, starts the goroutine that calls
// Ls, and returns the channels to the caller.
func (c *Client) GoLs(ctx context.Context, opts ...LsOption) (<-chan PinStatusGetter, <-chan error) {
res := make(chan PinStatusGetter)
errs := make(chan error, 1)

go func() {
errs <- c.Ls(ctx, res, opts...)

Check warning on line 216 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L211-L216

Added lines #L211 - L216 were not covered by tests
}()

return res, errs
}

func (c *Client) LsSync(ctx context.Context, opts ...LsOption) ([]PinStatusGetter, error) {
resCh, errCh := c.Ls(ctx, opts...)
resCh, errCh := c.GoLs(ctx, opts...)

Check warning on line 223 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L223

Added line #L223 was not covered by tests

var res []PinStatusGetter
for r := range resCh {
Expand All @@ -219,8 +232,6 @@

// Manual version of Ls that returns a single batch of results and int with total count
func (c *Client) LsBatchSync(ctx context.Context, opts ...LsOption) ([]PinStatusGetter, int, error) {
var res []PinStatusGetter

settings := new(lsSettings)
for _, o := range opts {
if err := o(settings); err != nil {
Expand All @@ -233,9 +244,13 @@
return nil, 0, err
}

var res []PinStatusGetter

Check warning on line 247 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L247

Added line #L247 was not covered by tests
results := pinRes.GetResults()
for _, r := range results {
res = append(res, &pinStatusObject{r})
if len(results) != 0 {
res = make([]PinStatusGetter, len(results))
for i, r := range results {
res[i] = &pinStatusObject{r}
}

Check warning on line 253 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L249-L253

Added lines #L249 - L253 were not covered by tests
}

return res, int(pinRes.Count), nil
Expand Down Expand Up @@ -274,8 +289,7 @@
// TODO: Ignoring HTTP Response OK?
results, httpresp, err := getter.Execute()
if err != nil {
err := httperr(httpresp, err)
return pinResults{}, err
return pinResults{}, httperr(httpresp, err)

Check warning on line 292 in pinning/remote/client/client.go

View check run for this annotation

Codecov / codecov/patch

pinning/remote/client/client.go#L292

Added line #L292 was not covered by tests
}

return results, nil
Expand Down
Loading