Skip to content

Commit

Permalink
feat(workflows): add ability to send direct signal to a workflow name…
Browse files Browse the repository at this point in the history
… + tags
  • Loading branch information
MasterPtato committed Jan 24, 2025
1 parent 9a43f01 commit ff931bf
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 25 deletions.
68 changes: 61 additions & 7 deletions packages/common/chirp-workflow/core/src/builder/common/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use uuid::Uuid;

use crate::{
builder::BuilderError, db::DatabaseHandle, error::WorkflowError, metrics, signal::Signal,
workflow::Workflow,
};

pub struct SignalBuilder<T: Signal + Serialize> {
db: DatabaseHandle,
ray_id: Uuid,
body: T,
to_workflow_name: Option<&'static str>,
to_workflow_id: Option<Uuid>,
tags: serde_json::Map<String, serde_json::Value>,
error: Option<BuilderError>,
Expand All @@ -23,13 +25,14 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
db,
ray_id,
body,
to_workflow_name: None,
to_workflow_id: None,
tags: serde_json::Map::new(),
error: None,
}
}

pub fn to_workflow(mut self, workflow_id: Uuid) -> Self {
pub fn to_workflow_id(mut self, workflow_id: Uuid) -> Self {
if self.error.is_some() {
return self;
}
Expand All @@ -39,6 +42,16 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
self
}

pub fn to_workflow<W: Workflow>(mut self) -> Self {
if self.error.is_some() {
return self;
}

self.to_workflow_name = Some(W::NAME);

self
}

pub fn tags(mut self, tags: serde_json::Value) -> Self {
if self.error.is_some() {
return self;
Expand Down Expand Up @@ -81,16 +94,41 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
.map_err(WorkflowError::SerializeSignalBody)
.map_err(GlobalError::raw)?;

match (self.to_workflow_id, self.tags.is_empty()) {
(Some(workflow_id), true) => {
tracing::debug!(signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal");
match (
self.to_workflow_name,
self.to_workflow_id,
self.tags.is_empty(),
) {
(Some(workflow_name), None, _) => {
tracing::debug!(
signal_name=%T::NAME,
to_workflow_name=%workflow_name,
tags=?self.tags,
%signal_id,
"dispatching signal via workflow name and tags"
);

let workflow_id = self
.db
.find_workflow(workflow_name, &serde_json::Value::Object(self.tags))
.await?
.ok_or(WorkflowError::WorkflowNotFound)
.map_err(GlobalError::raw)?;

self.db
.publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, &input_val)
.await
.map_err(GlobalError::raw)?;
}
(None, Some(workflow_id), true) => {
tracing::debug!(signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal via workflow id");

self.db
.publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, &input_val)
.await
.map_err(GlobalError::raw)?;
}
(None, false) => {
(None, None, false) => {
tracing::debug!(signal_name=%T::NAME, tags=?self.tags, %signal_id, "dispatching tagged signal");

self.db
Expand All @@ -104,8 +142,24 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
.await
.map_err(GlobalError::raw)?;
}
(Some(_), false) => return Err(BuilderError::WorkflowIdAndTags.into()),
(None, true) => return Err(BuilderError::NoWorkflowIdOrTags.into()),
(Some(_), Some(_), _) => {
return Err(BuilderError::InvalidSignalSend(
"cannot provide both workflow and workflow id",
)
.into())
}
(None, Some(_), false) => {
return Err(BuilderError::InvalidSignalSend(
"cannot provide tags if providing a workflow id",
)
.into())
}
(None, None, true) => {
return Err(BuilderError::InvalidSignalSend(
"no workflow, workflow id, or tags provided",
)
.into())
}
}

metrics::SIGNAL_PUBLISHED
Expand Down
6 changes: 2 additions & 4 deletions packages/common/chirp-workflow/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ pub mod workflow;
pub(crate) enum BuilderError {
#[error("tags must be a JSON map")]
TagsNotMap,
#[error("cannot call `to_workflow` and set tags on the same signal")]
WorkflowIdAndTags,
#[error("must call `to_workflow` or set tags on signal")]
NoWorkflowIdOrTags,
#[error("invalid signal send: {0}")]
InvalidSignalSend(&'static str),
#[error("cannot dispatch a workflow/signal from an operation within a workflow execution. trigger it from the workflow's body")]
CannotDispatchFromOpInWorkflow,
#[error("using tags on a sub workflow ({0}) with `.output()` is not supported")]
Expand Down
82 changes: 74 additions & 8 deletions packages/common/chirp-workflow/core/src/builder/workflow/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use uuid::Uuid;

use crate::{
builder::BuilderError, ctx::WorkflowCtx, error::WorkflowError, history::cursor::HistoryResult,
metrics, signal::Signal,
metrics, signal::Signal, workflow::Workflow,
};

pub struct SignalBuilder<'a, T: Signal + Serialize> {
ctx: &'a mut WorkflowCtx,
version: usize,

body: T,
to_workflow_name: Option<&'static str>,
to_workflow_id: Option<Uuid>,
tags: serde_json::Map<String, serde_json::Value>,
error: Option<BuilderError>,
Expand All @@ -26,13 +27,14 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
version,

body,
to_workflow_name: None,
to_workflow_id: None,
tags: serde_json::Map::new(),
error: None,
}
}

pub fn to_workflow(mut self, workflow_id: Uuid) -> Self {
pub fn to_workflow_id(mut self, workflow_id: Uuid) -> Self {
if self.error.is_some() {
return self;
}
Expand All @@ -42,6 +44,16 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
self
}

pub fn to_workflow<W: Workflow>(mut self) -> Self {
if self.error.is_some() {
return self;
}

self.to_workflow_name = Some(W::NAME);

self
}

pub fn tags(mut self, tags: serde_json::Value) -> Self {
if self.error.is_some() {
return self;
Expand Down Expand Up @@ -111,15 +123,53 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
.map_err(WorkflowError::SerializeSignalBody)
.map_err(GlobalError::raw)?;

match (self.to_workflow_id, self.tags.is_empty()) {
(Some(workflow_id), true) => {
match (
self.to_workflow_name,
self.to_workflow_id,
self.tags.is_empty(),
) {
(Some(workflow_name), None, _) => {
tracing::debug!(
name=%self.ctx.name(),
id=%self.ctx.workflow_id(),
signal_name=%T::NAME,
to_workflow_name=%workflow_name,
%signal_id,
"dispatching signal via workflow name and tags"
);

let workflow_id = self
.ctx
.db()
.find_workflow(workflow_name, &serde_json::Value::Object(self.tags))
.await?
.ok_or(WorkflowError::WorkflowNotFound)
.map_err(GlobalError::raw)?;

self.ctx
.db()
.publish_signal_from_workflow(
self.ctx.workflow_id(),
&location,
self.version,
self.ctx.ray_id(),
workflow_id,
signal_id,
T::NAME,
&input_val,
self.ctx.loop_location(),
)
.await
.map_err(GlobalError::raw)?;
}
(None, Some(workflow_id), true) => {
tracing::debug!(
name=%self.ctx.name(),
id=%self.ctx.workflow_id(),
signal_name=%T::NAME,
to_workflow_id=%workflow_id,
%signal_id,
"dispatching signal"
"dispatching signal via workflow id"
);

self.ctx
Expand All @@ -138,7 +188,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
.await
.map_err(GlobalError::raw)?;
}
(None, false) => {
(None, None, false) => {
tracing::debug!(
name=%self.ctx.name(),
id=%self.ctx.workflow_id(),
Expand All @@ -164,8 +214,24 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
.await
.map_err(GlobalError::raw)?;
}
(Some(_), false) => return Err(BuilderError::WorkflowIdAndTags.into()),
(None, true) => return Err(BuilderError::NoWorkflowIdOrTags.into()),
(Some(_), Some(_), _) => {
return Err(BuilderError::InvalidSignalSend(
"cannot provide both workflow and workflow id",
)
.into())
}
(None, Some(_), false) => {
return Err(BuilderError::InvalidSignalSend(
"cannot provide tags if providing a workflow id",
)
.into())
}
(None, None, true) => {
return Err(BuilderError::InvalidSignalSend(
"no workflow, workflow id, or tags provided",
)
.into())
}
}

metrics::SIGNAL_PUBLISHED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ impl Database for DatabaseFdbSqliteNats {
}
}

/// Returns the first workflow with the given name and tags, first meaning the one with the lowest uuid
/// value (interpreted as u128) because its in a KV store. There is no way to get any other workflow
/// besides the first.
async fn find_workflow(
&self,
workflow_name: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ CREATE TABLE workflow_activity_errors (
location BLOB PRIMARY KEY, -- JSONB
activity_name TEXT NOT NULL,
error TEXT NOT NULL,
ts INT NOT NULL
ts INT NOT NULL,
FOREIGN KEY (location) REFERENCES workflow_activity_events (location)
) STRICT;

-- Signal events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const MIGRATIONS_DIR: Dir =
include_dir!("$CARGO_MANIFEST_DIR/src/db/fdb_sqlite_nats/sqlite/migrations");

lazy_static::lazy_static! {
// We use a lazy static because this nly needs to be processed once
// We use a lazy static because this only needs to be processed once
static ref MIGRATIONS: Migrations = Migrations::new();
}

Expand Down
8 changes: 4 additions & 4 deletions packages/common/pools/src/utils/sql_query_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ macro_rules! __sql_query {

// Acquire connection
$crate::__sql_query_metrics_acquire!(_acquire);
let crdb = $driver;
let mut conn = $crate::__sql_acquire!($ctx, crdb);
let driver = $driver;
let mut conn = $crate::__sql_acquire!($ctx, driver);

// Execute query
$crate::__sql_query_metrics_start!($ctx, execute, _acquire, _start);
Expand Down Expand Up @@ -184,8 +184,8 @@ macro_rules! __sql_query_as {

// Acquire connection
$crate::__sql_query_metrics_acquire!(_acquire);
let crdb = $driver;
let mut conn = $crate::__sql_acquire!($ctx, crdb);
let driver = $driver;
let mut conn = $crate::__sql_acquire!($ctx, driver);

// Execute query
$crate::__sql_query_metrics_start!($ctx, $action, _acquire, _start);
Expand Down

0 comments on commit ff931bf

Please sign in to comment.