Skip to content

Commit

Permalink
Add
Browse files Browse the repository at this point in the history
  • Loading branch information
jiribenes committed Nov 22, 2024
1 parent 3ba0081 commit a541b08
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
3 changes: 3 additions & 0 deletions examples/stdlib/stream/zip.check
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1, a
2, b
3, c
12 changes: 12 additions & 0 deletions examples/stdlib/stream/zip.effekt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import stream

def main() = {
def stream1() = [1, 2, 3].each
def stream2() = array::fromList(["a", "b", "c", "d"]).each

with val tup = for[(Int, String)] {
zip[Int, String] { stream1 } { stream2 }
}
val (a, b) = tup
println(show(a) ++ ", " ++ show(b))
}
29 changes: 26 additions & 3 deletions libraries/common/stream.effekt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ def exhaustively[A] { program: () => A / stop } { action: A => Unit }: Unit =
def stop() = ()
}

/// Run `program` forever until `stop` is thrown.
def exhaustively[A] { program: () => A / stop }: Unit =
try {
def go(): Unit = {
program()
go()
}
go()
} with stop {
def stop() = ()
}

// In Effekt lower bounds are inclusive and upper bounds are exclusive

Expand Down Expand Up @@ -216,7 +227,7 @@ def collectBytes[R] { stream: () => R / emit[Byte] }: (R, ByteArray) = {
def collectBytes { stream: () => Unit / emit[Byte] }: ByteArray =
collectBytes[Unit]{stream}.second

def feed[T](list: List[T]) { reader: () => Unit / read[T] } = {
def feed[T](list: List[T]) { reader: () => Unit / read[T] }: Unit = {
var l = list
try {
reader()
Expand All @@ -232,7 +243,7 @@ def feed[T](list: List[T]) { reader: () => Unit / read[T] } = {
}
}

def feed[T](array: Array[T]) { reader: () => Unit / read[T] } = {
def feed[T](array: Array[T]) { reader: () => Unit / read[T] }: Unit = {
var i = 0
try {
reader()
Expand All @@ -247,7 +258,7 @@ def feed[T](array: Array[T]) { reader: () => Unit / read[T] } = {
}
}

def feed(bytes: ByteArray) { reader: () => Unit / read[Byte] } = {
def feed(bytes: ByteArray) { reader: () => Unit / read[Byte] }: Unit = {
var i = 0
try {
reader()
Expand Down Expand Up @@ -282,6 +293,18 @@ def source[A, R] { stream: () => Unit / emit[A] } { reader: () => R / read[A] }:
}
}

/// Combines two streams together producing a stream of pairs in lockstep.
/// Given two streams of length `n` and `m`, it produces a stream of length `min(n, m)`.
def zip[A, B] { stream1: () => Unit / emit[A] } { stream2: () => Unit / emit[B] }: Unit / emit[(A, B)] = {
with source[A, Unit] { stream1 }
with source[B, Unit] { stream2 }

exhaustively {
val a = do read[A]()
val b = do read[B]()
do emit((a, b))
}
}

def writeFile[R](path: String) { stream: () => R / emit[Byte] }: R / Exception[IOError] = {

Expand Down

0 comments on commit a541b08

Please sign in to comment.