Skip to content

Commit

Permalink
fix: CombineSortedChain
Browse files Browse the repository at this point in the history
- Combine multiple sorted channels into a single sorted channel using a priority queue.
- Refactor `GetTopKItems` and `GetSmallestNItems` in `heap.go` to use a priority queue.
- Split the inner priority queue into two structs, `innerPriorityQ` and `PriorityQ`, in `priorityq.go`.
- Add unit tests for all the edge cases of the `CombineSortedChain` function.
- Benchmark the `CombineSortedChain` function.
  • Loading branch information
Laisky committed Dec 26, 2023
1 parent c67ee7c commit 08726fd
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 39 deletions.
7 changes: 5 additions & 2 deletions algorithm/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"slices"

"github.com/Laisky/errors/v2"

"github.com/Laisky/go-utils/v4/common"
)

Expand Down Expand Up @@ -53,7 +54,9 @@ func GetTopKItems[T common.Sortable](

q := NewPriorityQ[T](heapSort)
for v := range inputChan {
q.Push(v)
q.Push(PriorityItem[T]{
Val: v,
})
if q.Len() > topN {
q.Pop()
}
Expand All @@ -62,7 +65,7 @@ func GetTopKItems[T common.Sortable](
result = make([]T, 0, topN)
for q.Len() != 0 {
it := q.Pop()
result = append(result, it)
result = append(result, it.GetVal())
}

return result, nil
Expand Down
47 changes: 35 additions & 12 deletions algorithm/priorityq.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,44 @@ type PriorityQ[T common.Sortable] struct {
q *innerPriorityQ[T]
}

// PriotiryItemItf priority item interface
type PriotiryItemItf[T common.Sortable] interface {
GetVal() T
}

// PriorityItem priority item
type PriorityItem[T common.Sortable] struct {
// Val T
Val T
// Name whatever to identify this item
Name any
}

// GetVal get value of priority item
func (t PriorityItem[T]) GetVal() T {
return t.Val
}

// NewPriorityQ create new PriorityQ
//
// Args:
// - order: common.SortOrderAsc or common.SortOrderDesc,
// if you want to get topN items, use common.SortOrderDesc,
// if you want to get bottomN items, use common.SortOrderAsc.
func NewPriorityQ[T common.Sortable](order common.SortOrder) *PriorityQ[T] {
return &PriorityQ[T]{
q: newPriorityQueue[T](order),
q: newinnerPriorityQ[T](order),
}
}

// Push push item into priority queue
func (pq *PriorityQ[T]) Push(v T) {
func (pq *PriorityQ[T]) Push(v PriotiryItemItf[T]) {
heap.Push(pq.q, v)
}

// Pop pop item from priority queue
func (pq *PriorityQ[T]) Pop() T {
return heap.Pop(pq.q).(T)
func (pq *PriorityQ[T]) Pop() PriotiryItemItf[T] {
return heap.Pop(pq.q).(PriotiryItemItf[T]) //nolint:forcetypeassert // panic
}

// Len return length of priority queue
Expand All @@ -36,7 +59,7 @@ func (pq *PriorityQ[T]) Len() int {
}

// Peek peek item from priority queue
func (pq *PriorityQ[T]) Peek() T {
func (pq *PriorityQ[T]) Peek() PriotiryItemItf[T] {
return pq.q.vals[len(pq.q.vals)-1]
}

Expand Down Expand Up @@ -85,16 +108,16 @@ func (pq *PriorityQ[T]) Peek() T {
//
// Do not use this structure directly, use `NewPriorityQueue` instead.
type innerPriorityQ[T common.Sortable] struct {
vals []T
vals []PriotiryItemItf[T]
order common.SortOrder
}

// newPriorityQueue create new PriorityQ
// newinnerPriorityQ create new PriorityQ
//
// https://pkg.go.dev/container/[email protected]#example-package-IntHeap
func newPriorityQueue[T common.Sortable](order common.SortOrder) *innerPriorityQ[T] {
func newinnerPriorityQ[T common.Sortable](order common.SortOrder) *innerPriorityQ[T] {
return &innerPriorityQ[T]{
vals: []T{},
vals: []PriotiryItemItf[T]{},
order: order,
}
}
Expand All @@ -105,9 +128,9 @@ func (pq *innerPriorityQ[T]) Len() int { return len(pq.vals) }
// Less compare two items in heapq
func (pq *innerPriorityQ[T]) Less(i, j int) bool {
if pq.order == common.SortOrderAsc {
return pq.vals[i] < pq.vals[j]
return pq.vals[i].GetVal() < pq.vals[j].GetVal()
} else {
return pq.vals[i] > pq.vals[j]
return pq.vals[i].GetVal() > pq.vals[j].GetVal()
}
}

Expand All @@ -119,7 +142,7 @@ func (pq *innerPriorityQ[T]) Swap(i, j int) {
}

func (pq *innerPriorityQ[T]) Push(v any) {
pq.vals = append(pq.vals, v.(T))
pq.vals = append(pq.vals, v.(PriotiryItemItf[T])) //nolint:forcetypeassert // panic
}

// Pop pop item from heapq
Expand Down
68 changes: 43 additions & 25 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,46 +1431,64 @@ func CombineSortedChain[T Sortable](sortOrder common.SortOrder, chans ...chan T)
return chans[0], nil
}

isHighest := sortOrder == common.SortOrderAsc
heap, err := algorithm.NewLimitSizeHeap[T](len(chans), isHighest)
if err != nil {
return nil, errors.Wrapf(err, "new heap with size %d, isHighest %v", len(chans), isHighest)
heap := algorithm.NewPriorityQ[T](sortOrder)

activeChans := make(map[int]chan T, len(chans))
for i, ch := range chans {
activeChans[i] = ch
}

result = make(chan T)
go func() {
defer close(result)

closedChans := make(map[int]struct{})
for idx, c := range activeChans {
v, ok := <-c
if !ok {
continue
}

heap.Push(algorithm.PriorityItem[T]{
Val: v,
Name: idx,
})
}

for {
for idx, c := range chans {
if _, ok := closedChans[idx]; ok {
continue
}
if heap.Len() == 0 {
return
}

it := heap.Pop()
result <- it.GetVal()

v, ok := <-c
if !ok {
closedChans[idx] = struct{}{}
if len(closedChans) == len(chans) {
idx := it.(algorithm.PriorityItem[T]).Name.(int) //nolint:forcetypeassert // panic
ch, ok := activeChans[idx]
if !ok { // this chan is already exhausted and removed
continue
}

for i := 0; i < len(chans); i++ {
it := heap.Push(latestIt)
if it == nil {
return
}
v, ok := <-ch
if !ok { // this chan is exhausted
delete(activeChans, idx)

result <- it.GetKey().(T)
}
// there is no active chans
if len(activeChans) == 0 {
for i := 0; i < heap.Len(); i++ {
it := heap.Pop()
result <- it.GetVal()
}

continue
return
}

it := heap.Push(algorithm.NewHeapComparableItem[T](v))
if it != nil {
result <- it.GetKey().(T)
}
continue
}

heap.Push(algorithm.PriorityItem[T]{
Val: v,
Name: idx,
})
}
}()

Expand Down
Loading

0 comments on commit 08726fd

Please sign in to comment.