Skip to content

Commit

Permalink
Merge pull request #5 from reugn/develop
Browse files Browse the repository at this point in the history
v0.4.0
  • Loading branch information
reugn authored Jun 12, 2022
2 parents ee40c65 + 07131f5 commit bb6e6f3
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 135 deletions.
9 changes: 1 addition & 8 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
run:
skip-dirs:
- generic
- benchmarks

linters:
disable-all: true
Expand All @@ -15,20 +15,14 @@ linters:
- gocyclo
- gofmt
- goimports
- gosimple
- govet
- ineffassign
- lll
- misspell
- prealloc
- revive
- staticcheck
- structcheck
- stylecheck
- typecheck
- unconvert
- unparam
- unused
- varcheck

issues:
Expand All @@ -37,4 +31,3 @@ issues:
- path: _test\.go
linters:
- errcheck
- unparam
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ The implemented patterns were taken from Scala and Java.
## Overview
* **Future** - A placeholder object for a value that may not yet exist.
* **Promise** - While futures are defined as a type of read-only placeholder object created for a result which doesn’t yet exist, a promise can be thought of as a writable, single-assignment container, which completes a future.
* **Task** - A data type for controlling possibly lazy and asynchronous computations.
* **Reentrant Lock** - Mutex that allows goroutines to enter into the lock on a resource more than once.
* **Optimistic Lock** - Mutex that allows optimistic reading. Could be retried or switched to RLock in case of failure. Significantly improves performance in case of frequent reads and short writes. See [benchmarks](./benchmarks/README.md).

### [Generic types](./generic)
* **Task** - A data type for controlling possibly lazy and asynchronous computations.

## Examples
Can be found in the examples directory/tests.

Expand Down
14 changes: 9 additions & 5 deletions examples/future/main.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package main

import (
"fmt"
"log"
"time"

"github.com/reugn/async"
)

func main() {
future := asyncAction()
rt, _ := future.Get()
fmt.Println(rt)
result, err := future.Get()
if err != nil {
log.Fatal(err)
}
log.Print(result)
}

func asyncAction() async.Future {
promise := async.NewPromise()
func asyncAction() async.Future[string] {
promise := async.NewPromise[string]()
go func() {
time.Sleep(time.Second)
promise.Success("OK")
}()

return promise.Future()
}
76 changes: 41 additions & 35 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,70 @@ import "sync"

// Future represents a value which may or may not currently be available,
// but will be available at some point, or an error if that value could not be made available.
type Future interface {
type Future[T any] interface {

// Map creates a new Future by applying a function to the successful result of this Future.
Map(func(interface{}) (interface{}, error)) Future
Map(func(T) (T, error)) Future[T]

// FlatMap creates a new Future by applying a function to the successful result of
// this Future.
FlatMap(func(interface{}) (Future, error)) Future
FlatMap(func(T) (Future[T], error)) Future[T]

// Get blocks until the Future is completed and returns either a result or an error.
Get() (interface{}, error)
Get() (T, error)

// Recover handles any error that this Future might contain using a resolver function.
Recover(func() (interface{}, error)) Future
Recover(func() (T, error)) Future[T]

// RecoverWith handles any error that this Future might contain using another Future.
RecoverWith(Future) Future
RecoverWith(Future[T]) Future[T]

// complete completes the Future with either a value or an error.
// Is used by Promise internally.
complete(interface{}, error)
complete(T, error)
}

// FutureImpl implements the Future interface.
type FutureImpl struct {
acc sync.Once
compl sync.Once
done chan interface{}
value interface{}
err error
type FutureImpl[T any] struct {
acceptOnce sync.Once
completeOnce sync.Once
done chan interface{}
value T
err error
}

// Verify FutureImpl satisfies the Future interface.
var _ Future[any] = (*FutureImpl[any])(nil)

// NewFuture returns a new Future.
func NewFuture() Future {
return &FutureImpl{
func NewFuture[T any]() Future[T] {
return &FutureImpl[T]{
done: make(chan interface{}),
}
}

// accept blocks once, until the Future result is available.
func (fut *FutureImpl) accept() {
fut.acc.Do(func() {
func (fut *FutureImpl[T]) accept() {
fut.acceptOnce.Do(func() {
sig := <-fut.done
switch v := sig.(type) {
case error:
fut.err = v
default:
fut.value = v
fut.value = v.(T)
}
})
}

// Map creates a new Future by applying a function to the successful result of this Future
// and returns the result of the function as a new Future.
func (fut *FutureImpl) Map(f func(interface{}) (interface{}, error)) Future {
next := NewFuture()
func (fut *FutureImpl[T]) Map(f func(T) (T, error)) Future[T] {
next := NewFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
next.complete(nil, fut.err)
var nilT T
next.complete(nilT, fut.err)
} else {
next.complete(f(fut.value))
}
Expand All @@ -73,16 +77,18 @@ func (fut *FutureImpl) Map(f func(interface{}) (interface{}, error)) Future {

// FlatMap creates a new Future by applying a function to the successful result of
// this Future and returns the result of the function as a new Future.
func (fut *FutureImpl) FlatMap(f func(interface{}) (Future, error)) Future {
next := NewFuture()
func (fut *FutureImpl[T]) FlatMap(f func(T) (Future[T], error)) Future[T] {
next := NewFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
next.complete(nil, fut.err)
var nilT T
next.complete(nilT, fut.err)
} else {
tfut, terr := f(fut.value)
if terr != nil {
next.complete(nil, terr)
var nilT T
next.complete(nilT, terr)
} else {
next.complete(tfut.Get())
}
Expand All @@ -92,15 +98,15 @@ func (fut *FutureImpl) FlatMap(f func(interface{}) (Future, error)) Future {
}

// Get blocks until the Future is completed and returns either a result or an error.
func (fut *FutureImpl) Get() (interface{}, error) {
func (fut *FutureImpl[T]) Get() (T, error) {
fut.accept()
return fut.value, fut.err
}

// Recover handles any error that this Future might contain using a given resolver function.
// Returns the result as a new Future.
func (fut *FutureImpl) Recover(f func() (interface{}, error)) Future {
next := NewFuture()
func (fut *FutureImpl[T]) Recover(f func() (T, error)) Future[T] {
next := NewFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
Expand All @@ -114,8 +120,8 @@ func (fut *FutureImpl) Recover(f func() (interface{}, error)) Future {

// RecoverWith handles any error that this Future might contain using another Future.
// Returns the result as a new Future.
func (fut *FutureImpl) RecoverWith(rf Future) Future {
next := NewFuture()
func (fut *FutureImpl[T]) RecoverWith(rf Future[T]) Future[T] {
next := NewFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
Expand All @@ -128,13 +134,13 @@ func (fut *FutureImpl) RecoverWith(rf Future) Future {
}

// complete completes the Future with either a value or an error.
func (fut *FutureImpl) complete(v interface{}, e error) {
fut.compl.Do(func() {
func (fut *FutureImpl[T]) complete(value T, err error) {
fut.completeOnce.Do(func() {
go func() {
if e != nil {
fut.done <- e
if err != nil {
fut.done <- err
} else {
fut.done <- v
fut.done <- value
}
}()
})
Expand Down
41 changes: 23 additions & 18 deletions future_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ import (
)

func TestFuture(t *testing.T) {
p := NewPromise()
p := NewPromise[bool]()
go func() {
time.Sleep(time.Millisecond * 100)
p.Success(true)
}()
v, e := p.Future().Get()
internal.AssertEqual(t, v.(bool), true)

internal.AssertEqual(t, v, true)
internal.AssertEqual(t, e, nil)
}

func TestFutureUtils(t *testing.T) {
p1 := NewPromise()
p2 := NewPromise()
p3 := NewPromise()
p1 := NewPromise[int]()
p2 := NewPromise[int]()
p3 := NewPromise[int]()
go func() {
time.Sleep(time.Millisecond * 100)
p1.Success(1)
Expand All @@ -31,54 +32,58 @@ func TestFutureUtils(t *testing.T) {
time.Sleep(time.Millisecond * 300)
p3.Success(3)
}()
arr := []Future{p1.Future(), p2.Future(), p3.Future()}
arr := []Future[int]{p1.Future(), p2.Future(), p3.Future()}
res := []interface{}{1, 2, 3}
futRes, _ := FutureSeq(arr).Get()

internal.AssertEqual(t, res, futRes)
}

func TestFutureFirstCompleted(t *testing.T) {
p := NewPromise()
p := NewPromise[bool]()
go func() {
time.Sleep(time.Millisecond * 1000)
p.Success(true)
}()
timeout := FutureTimer(time.Millisecond * 100)
timeout := FutureTimer[bool](time.Millisecond * 100)
futRes, futErr := FutureFirstCompletedOf(p.Future(), timeout).Get()
internal.AssertEqual(t, nil, futRes)

internal.AssertEqual(t, false, futRes)
if futErr == nil {
t.Fatalf("futErr is nil")
}
}

func TestFutureTransform(t *testing.T) {
p1 := NewPromise()
p1 := NewPromise[int]()
go func() {
time.Sleep(time.Millisecond * 100)
p1.Success(1)
}()
res, _ := p1.Future().Map(func(v interface{}) (interface{}, error) {
return v.(int) + 1, nil
}).FlatMap(func(v interface{}) (Future, error) {
nv := v.(int) + 1
p2 := NewPromise()
res, _ := p1.Future().Map(func(v int) (int, error) {
return v + 1, nil
}).FlatMap(func(v int) (Future[int], error) {
nv := v + 1
p2 := NewPromise[int]()
p2.Success(nv)
return p2.Future(), nil
}).Recover(func() (interface{}, error) {
}).Recover(func() (int, error) {
return 5, nil
}).Get()

internal.AssertEqual(t, 3, res)
}

func TestFutureFailure(t *testing.T) {
p1 := NewPromise()
p2 := NewPromise()
p1 := NewPromise[int]()
p2 := NewPromise[int]()
go func() {
time.Sleep(time.Millisecond * 100)
p1.Failure(errors.New("Future error"))
time.Sleep(time.Millisecond * 200)
p2.Success(2)
}()
res, _ := p1.Future().RecoverWith(p2.Future()).Get()

internal.AssertEqual(t, 2, res)
}
23 changes: 12 additions & 11 deletions future_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"time"
)

// FutureSeq reduces many Futures into a single Future
func FutureSeq(futures []Future) Future {
next := NewFuture()
// FutureSeq reduces many Futures into a single Future.
func FutureSeq[T any](futures []Future[T]) Future[[]interface{}] {
next := NewFuture[[]interface{}]()
go func() {
seq := make([]interface{}, len(futures))
for i, f := range futures {
Expand All @@ -25,26 +25,27 @@ func FutureSeq(futures []Future) Future {

// FutureFirstCompletedOf asynchronously returns a new Future to the result of the first Future
// in the list that is completed. This means no matter if it is completed as a success or as a failure.
func FutureFirstCompletedOf(futures ...Future) Future {
next := NewFuture()
func FutureFirstCompletedOf[T any](futures ...Future[T]) Future[T] {
next := NewFuture[T]()
go func() {
for _, f := range futures {
go func(future Future) {
go func(future Future[T]) {
next.complete(future.Get())
}(f)
}
}()
return next
}

// FutureTimer returns Future that will have been resolved after given duration
// useful for FutureFirstCompletedOf for timeout purposes
func FutureTimer(d time.Duration) Future {
next := NewFuture()
// FutureTimer returns Future that will have been resolved after given duration;
// useful for FutureFirstCompletedOf for timeout purposes.
func FutureTimer[T any](d time.Duration) Future[T] {
next := NewFuture[T]()
go func() {
timer := time.NewTimer(d)
<-timer.C
next.complete(nil, fmt.Errorf("FutureTimer %v timeout", d))
var nilT T
next.complete(nilT, fmt.Errorf("FutureTimer %v timeout", d))
}()
return next
}
Loading

0 comments on commit bb6e6f3

Please sign in to comment.