Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/ox/flow/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import scala.annotation.nowarn
*
* A flow is lazy - evaluation happens only when it's run.
*
* Flows can be created using the [[Flow.usingSink]], [[Flow.fromValues]] and other `Flow.from*` methods, [[Flow.tick]] etc.
* Flows can be created using the [[Flow.usingEmit]], [[Flow.usingChannel]], [[Flow.fromValues]] and other `Flow.from*` methods,
* [[Flow.tick]] etc.
*
* Transformation stages can be added using the available combinators, such as [[Flow.map]], [[Flow.buffer]], [[Flow.grouped]], etc. Each
* such method returns a new immutable `Flow` instance.
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/ox/flow/FlowCompanionOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import ox.Fork
import ox.channels.ChannelClosed
import ox.channels.ChannelClosedUnion.isValue
import ox.channels.Source
import ox.channels.Sink
import ox.channels.BufferCapacity
import ox.forever
import ox.forkUnsupervised
Expand Down Expand Up @@ -37,6 +38,26 @@ trait FlowCompanionOps:
*/
def usingEmit[T](withEmit: FlowEmit[T] => Unit): Flow[T] = usingEmitInline(withEmit)

/** Creates a flow, which when run, provides a [[Sink]] (channel) to the given `withSink` function. Elements can be sent to the sink to be
* processed by downstream stages. The `withSink` function is run asynchronously in a forked task.
*
* The flow completes when the `withSink` function completes and the provided sink is closed. The sink is automatically closed when
* `withSink` completes normally. If `withSink` throws an exception, the sink is closed with an error.
*
* Must be run within a concurrency scope as a fork is created to run the `withSink` function.
*
* @param withSink
* A function that receives a [[Sink]] to which elements can be sent.
*/
def usingChannel[T](withSink: Sink[T] => Unit)(using BufferCapacity, ox.OxUnsupervised): Flow[T] = usingEmitInline: emit =>
val ch = BufferCapacity.newChannel[T]
val _ = forkUnsupervised:
try
withSink(ch)
ch.doneOrClosed().discard // the channel might be already closed by `withSink`
catch case e: Throwable => ch.errorOrClosed(e).discard // the channel might be already closed by `withSink`
FlowEmit.channelToEmit(ch, emit)

/** Creates a flow using the given `source`. An element is emitted for each value received from the source. If the source is completed
* with an error, is it propagated by throwing.
*/
Expand Down
74 changes: 74 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class FlowOpsUsingChannelTest extends AnyFlatSpec with Matchers:
behavior of "usingChannel"

it should "send elements through the provided channel" in supervised:
Flow
.usingChannel(sink =>
sink.send(1)
sink.send(2)
sink.send(3)
)
.runToList() shouldBe List(1, 2, 3)

it should "propagate errors from the channel" in supervised:
val exception = new RuntimeException("test error")
val result = intercept[ox.channels.ChannelClosedException.Error]:
Flow
.usingChannel[Int](sink =>
sink.send(1)
throw exception
)
.runToList()
result.cause shouldBe exception

it should "work with transformations" in supervised:
Flow
.usingChannel[Int](sink =>
sink.send(1)
sink.send(2)
sink.send(3)
)
.map(_ * 2)
.runToList() shouldBe List(2, 4, 6)

it should "support concurrent sending" in supervised:
Flow
.usingChannel[Int](sink =>
val f1 = fork:
sink.send(1)
sink.send(2)
val f2 = fork:
sink.send(3)
sink.send(4)
f1.join()
f2.join()
)
.runToList()
.sorted shouldBe List(1, 2, 3, 4)

it should "handle channel closed by withSink with done()" in supervised:
Flow
.usingChannel[Int](sink =>
sink.send(1)
sink.send(2)
sink.done()
)
.runToList() shouldBe List(1, 2)

it should "handle channel closed by withSink with error()" in supervised:
val exception = new RuntimeException("explicit error")
val result = intercept[ox.channels.ChannelClosedException.Error]:
Flow
.usingChannel[Int](sink =>
sink.send(1)
sink.error(exception)
)
.runToList()
result.cause shouldBe exception
end FlowOpsUsingChannelTest
20 changes: 20 additions & 0 deletions doc/streaming/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ As part of the callback, you can create [supervision scopes](../structured-concu

Any asynchronous communication should be best done with [channels](channels.md). You can then manually forward any elements received from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.

Alternatively, flows can be created by providing a function that receives a `Sink` (channel):

```scala mdoc:compile-only
import ox.flow.Flow
import ox.{fork, supervised}

supervised:
Flow.usingChannel: sink =>
sink.send(1)
fork:
sink.send(2)
sink.send(3)
sink.send(4)
// TODO: transform the flow further & run
```

Unlike `usingEmit`, the `Sink` instance can be safely shared across threads, as channels are thread-safe. The provided function is run asynchronously in a forked task. The flow completes when the function completes and the sink is automatically closed. If the function throws an exception, it is propagated as a flow error.

Note that `Flow.usingChannel` must be run within a concurrency scope, as it creates a fork to run the provided function.

## Transforming flows: basics

Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages, many of them similar in function to corresponding methods on Scala's collections:
Expand Down