# Code of Conduct The FlowDSL community is committed to being a welcoming, inclusive, and harassment-free environment for everyone, regardless of experience level, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality. ## Our standards **Examples of behavior that contributes to a positive environment:** - Being respectful of differing viewpoints and experiences - Gracefully accepting constructive criticism - Focusing on what is best for the community and the project - Showing empathy toward other community members - Using welcoming and inclusive language **Examples of unacceptable behavior:** - Harassment, insults, or derogatory comments (public or private) - Publishing others' private information without explicit permission - Trolling or deliberately inflammatory comments - Sustained disruption of discussions - Any conduct that would reasonably be considered inappropriate in a professional setting ## Scope This Code of Conduct applies in all community spaces, including: - GitHub issues, discussions, and pull requests across the `flowdsl` organization - The FlowDSL Discord server - Any other official community channels ## Enforcement Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project maintainers at ****. All complaints will be reviewed and investigated promptly and fairly. Maintainers are obligated to maintain confidentiality with regard to the reporter. Maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by the project leadership. ## Enforcement guidelines Maintainers will use these guidelines when determining consequences: | Severity | Example | Consequence | | ----------------- | --------------------------------------------------- | ----------------------------------------------------------------------------------------------------------- | | 1 — Correction | Unwelcoming language | Private written warning, clarification of why the behavior was inappropriate | | 2 — Warning | Single incident of harassment | Warning with consequences for continued behavior; no interaction with those involved for a specified period | | 3 — Temporary ban | Sustained inappropriate behavior | Temporary ban from community spaces | | 4 — Permanent ban | Pattern of harassment, threats, or targeted attacks | Permanent ban from all community spaces | ## Attribution This Code of Conduct is adapted from the [Contributor Covenant](https://www.contributor-covenant.org){rel=""nofollow""}, version 2.1. ## Next steps - [Contributing Guide](https://flowdsl.com/docs/community/contributing) — how to contribute code and docs - [Community](https://flowdsl.com/docs/community) — ways to get involved # Contributing All FlowDSL repositories are open source (Apache 2.0) and welcome contributions. This guide covers how to set up your environment, write good PRs, and navigate the review process. ## Before you start - Check existing [issues](https://github.com/flowdsl/spec/issues){rel=""nofollow""} and [discussions](https://github.com/flowdsl/spec/discussions){rel=""nofollow""} to avoid duplicate work - For significant changes, open a discussion or issue first to align on direction before writing code - Read the [Code of Conduct](https://flowdsl.com/docs/community/code-of-conduct) — all contributors are expected to follow it ## Contributing to the specification The spec lives at [github.com/flowdsl/spec](https://github.com/flowdsl/spec){rel=""nofollow""}. It consists of: - `schema/flowdsl.schema.json` — the canonical JSON Schema - `docs/` — specification prose - `examples/` — valid example flow documents used in tests ### Setup ```bash git clone https://github.com/flowdsl/spec cd spec npm install ``` ### Validate your schema changes ```bash # Run schema tests (validates all examples/ against the schema) npm test # Check a specific example npx ajv validate -s schema/flowdsl.schema.json -d examples/order-fulfillment.flowdsl.json ``` ### PR checklist for spec changes - Update `schema/flowdsl.schema.json` - Add or update at least one example in `examples/` that exercises the new field - Update the relevant prose doc in `docs/` - `npm test` passes - PR description explains the motivation and links to any relevant RFC discussion ## Contributing to the Go SDK The Go SDK lives at [github.com/flowdsl/flowdsl-go](https://github.com/flowdsl/flowdsl-go){rel=""nofollow""}. ### Setup ```bash git clone https://github.com/flowdsl/flowdsl-go cd flowdsl-go go mod download ``` ### Run tests ```bash go test ./... # With race detector go test -race ./... # Integration tests (requires Docker) make test-integration ``` ### Code style - `gofmt` and `golangci-lint` are enforced in CI - Run `make lint` before opening a PR - Keep the `NodeHandler` interface stable — changes require a major version bump ## Contributing to the Python SDK The Python SDK lives at [github.com/flowdsl/flowdsl-py](https://github.com/flowdsl/flowdsl-py){rel=""nofollow""}. ### Setup ```bash git clone https://github.com/flowdsl/flowdsl-py cd flowdsl-py python -m venv .venv source .venv/bin/activate pip install -e ".[dev]" ``` ### Run tests ```bash pytest pytest --cov=flowdsl --cov-report=term-missing ``` ### Code style ```bash ruff check . ruff format . mypy flowdsl/ ``` ## Contributing to Studio Studio lives at [github.com/flowdsl/studio](https://github.com/flowdsl/studio){rel=""nofollow""}. Built with React 18, TypeScript, React Flow, and Zustand. ### Setup ```bash git clone https://github.com/flowdsl/studio cd studio npm install npm run dev # Open http://localhost:5173 ``` ### Run tests ```bash npm test # Vitest unit tests npm run e2e # Playwright end-to-end tests ``` ### Component conventions - Components live in `src/components/` - The `NodeContractCard` component is the signature UI — preserve its bilateral contract layout - Keep canvas state in Zustand, not local component state - Do not add runtime communication logic to Studio — it is a document editor only ## Contributing to the website (docs) The website lives at [github.com/flowdsl/website](https://github.com/flowdsl/website){rel=""nofollow""}. Built with NuxtJS 4 and @nuxt/content. ### Setup ```bash git clone https://github.com/flowdsl/website cd website npm install npm run dev # Open http://localhost:3000 ``` ### Adding or editing docs Documentation lives in `spec/docs/` as Markdown files. Each file has frontmatter: ```yaml --- title: My Page description: One-line description. weight: 350 --- ``` The `weight` field controls the order in prev/next navigation. Leave gaps between weights (e.g., 300, 310, 320) so new pages can be inserted without renumbering. ### Docs style guide - Use `##` for top-level sections (the page H1 is the title) - Code blocks must specify a language: ` ```yaml `, ` ```go `, ` ```bash ` - Use callout syntax for tips and warnings: ```text ::callout{type="tip"} Your tip here. :: ``` - Link to related docs with relative paths: `[Delivery Modes](/docs/concepts/delivery-modes)` - End every page with a "Next steps" section ## Opening a pull request 1. Fork the repository and create a branch: `git checkout -b my-feature` 2. Make your changes with tests 3. Run the linter and test suite 4. Push and open a PR against `main` 5. Fill in the PR template — describe the change and link the issue PRs are reviewed by maintainers within a few business days. Small, focused PRs with clear descriptions are reviewed faster. ## Next steps - [Community](https://flowdsl.com/docs/community) — other ways to get involved - [Code of Conduct](https://flowdsl.com/docs/community/code-of-conduct) — community standards # Community FlowDSL is open source (Apache 2.0) and community-driven. The specification, Studio editor, SDKs, and reference runtime are all developed in the open at [github.com/flowdsl](https://github.com/flowdsl){rel=""nofollow""}. ## Ways to participate ### Report a bug or request a feature Open an issue on the relevant repository: | Repository | Use for | | ---------------------------------------------------------------------------------------------- | ------------------------------------------------------ | | [flowdsl/spec](https://github.com/flowdsl/spec/issues){rel=""nofollow""} | Spec bugs, schema questions, docs, new field proposals | | [flowdsl/studio](https://github.com/flowdsl/studio/issues){rel=""nofollow""} | Visual editor bugs and feature requests | | [flowdsl/flowdsl-go](https://github.com/flowdsl/flowdsl-go/issues){rel=""nofollow""} | Go runtime and SDK issues | | [flowdsl/flowdsl-py](https://github.com/flowdsl/flowdsl-py/issues){rel=""nofollow""} | Python SDK issues | | [flowdsl/flowdsl-js](https://github.com/flowdsl/flowdsl-js/issues){rel=""nofollow""} | JavaScript/TypeScript SDK issues | ### Ask a question - **GitHub Discussions** — [github.com/flowdsl/spec/discussions](https://github.com/flowdsl/spec/discussions){rel=""nofollow""} — the primary place for open-ended questions and design discussions - **Discord** — [discord.gg/flowdsl](https://discord.gg/MUjXSwGbUY){rel=""nofollow""} — real-time chat with maintainers and community members ### Propose a spec change Major spec changes go through an **RFC** (Request for Comments) process: 1. Open a GitHub Discussion in [flowdsl/spec](https://github.com/flowdsl/spec/discussions){rel=""nofollow""} with the `RFC` label 2. Describe the problem, the proposed change, and any alternatives considered 3. The community and maintainers discuss for at least 2 weeks 4. If accepted, an issue is opened and a PR is authored against the spec Minor additions (new optional fields, clarifications) can be proposed directly as a Pull Request. ### Contribute code See the [Contributing Guide](https://flowdsl.com/docs/community/contributing) for how to set up your development environment and open a pull request. ### Share your flows Publish your FlowDSL examples in the [flowdsl/examples](https://github.com/flowdsl/examples){rel=""nofollow""} repository. Real-world examples help the community understand what is possible and how to model different domains. ## Governance FlowDSL is currently maintained by a small core team. The roadmap is public at [github.com/flowdsl/spec/projects](https://github.com/flowdsl/spec/projects){rel=""nofollow""}. Major decisions are made through the RFC process. ## Code of conduct All community spaces (GitHub, Discord, discussions) follow the [FlowDSL Code of Conduct](https://flowdsl.com/docs/community/code-of-conduct). The short version: be respectful, be constructive, be welcoming. ## Stay updated - **GitHub releases** — watch [flowdsl/spec](https://github.com/flowdsl/spec/releases){rel=""nofollow""} for spec releases - **Changelog** — [github.com/flowdsl/spec/blob/main/CHANGELOG.md](https://github.com/flowdsl/spec/blob/main/CHANGELOG.md){rel=""nofollow""} - **Blog** — [flowdsl.com/blog](https://flowdsl.com/blog){rel=""nofollow""} — announcements and deep-dives ## Next steps - [Contributing Guide](https://flowdsl.com/docs/community/contributing) — how to contribute code and docs - [Code of Conduct](https://flowdsl.com/docs/community/code-of-conduct) — community standards - [Migration](https://flowdsl.com/docs/migration) — upgrade guides between spec versions # Core Concepts ## The flow graph A FlowDSL document describes a **directed acyclic graph (DAG)**. Nodes are the vertices; edges are the directed connections between them. ```text ValidateOrder → ChargePayment → FulfillOrder → NotifyCustomer ``` The JSON/YAML document is always the source of truth. The visual canvas is a projection. ## Nodes A node is a unit of business logic. It has no knowledge of how its output is delivered — that is the edge's responsibility. ```yaml nodes: ChargePayment: operationId: charge_payment description: Charges the customer's payment method ``` **Naming conventions:** - Node component names → `PascalCase` - `operationId` values → `snake_case` ## Edges An edge connects two nodes and carries a **delivery policy**. ```yaml edges: - from: ChargePayment to: FulfillOrder delivery: mode: checkpoint packet: "asyncapi#/components/messages/PaymentConfirmed" ``` The `delivery.mode` field determines the transport and durability guarantee. ## Packets A packet is an AsyncAPI message reference. FlowDSL never duplicates message schemas — it references them: ```yaml packet: "asyncapi#/components/messages/OrderFulfilled" ``` ## The runtime The FlowDSL runtime reads the graph definition and: 1. Starts each node's handler 2. Wires up the transport layer according to each edge's delivery policy 3. Handles retries, checkpointing, and replay automatically Your business logic never touches transport code. # Checkpoints The `checkpoint` delivery mode snapshots pipeline state to MongoDB after each successful node execution. If the runtime crashes or a node fails mid-pipeline, execution resumes from the last saved checkpoint rather than restarting from the beginning. This is essential for long, multi-stage pipelines where early stages are expensive to re-run. ## How checkpointing works ```mermaid flowchart LR A[ExtractText] -->|"checkpoint\n✓ saved"| B[ChunkDocument] B -->|"checkpoint\n✓ saved"| C[EmbedChunks] C -->|"checkpoint\n✗ failed"| D[LlmSummarize] ``` In the diagram above: 1. `ExtractText` runs and its output is saved to MongoDB — checkpoint 1 ✓ 2. `ChunkDocument` runs and its output is saved — checkpoint 2 ✓ 3. `EmbedChunks` runs and its output is saved — checkpoint 3 ✓ 4. `LlmSummarize` fails — the runtime retries from checkpoint 3, not from `ExtractText` Without checkpoints, a failure at step 4 would restart the entire pipeline from step 1, re-extracting and re-chunking an expensive PDF. ## Configuring checkpoint edges ```yaml edges: - from: ExtractText to: ChunkDocument delivery: mode: checkpoint packet: ExtractedText checkpointInterval: 1 # Save after every packet (default) - from: ChunkDocument to: EmbedChunks delivery: mode: checkpoint packet: DocumentChunks batchSize: 50 # Batch 50 packets before checkpoint - from: EmbedChunks to: LlmSummarize delivery: mode: durable # Switch to durable for the expensive LLM step packet: EmbeddedChunks idempotencyKey: "{{payload.documentId}}-summarize" ``` ### Checkpoint-specific fields | Field | Type | Default | Description | | -------------------- | ------- | ------- | ------------------------------------------------ | | `checkpointInterval` | integer | 1 | Save a checkpoint every N packets. | | `batchSize` | integer | 1 | Process N packets before writing the checkpoint. | ## Complete pipeline example A document intelligence pipeline using checkpoints throughout: ```yaml flowdsl: "1.0" info: title: Document Intelligence Pipeline version: "1.0.0" nodes: ReceiveDocument: operationId: receive_document kind: source outputs: out: { packet: DocumentUpload } ExtractText: operationId: extract_pdf_text kind: transform inputs: in: { packet: DocumentUpload } outputs: out: { packet: ExtractedText } ChunkDocument: operationId: chunk_document kind: transform inputs: in: { packet: ExtractedText } outputs: out: { packet: DocumentChunks } EmbedChunks: operationId: embed_chunks kind: action inputs: in: { packet: DocumentChunks } outputs: out: { packet: EmbeddedChunks } LlmSummarize: operationId: llm_summarize kind: llm inputs: in: { packet: EmbeddedChunks } outputs: out: { packet: Summary } IndexDocument: operationId: index_document kind: action inputs: in: { packet: Summary } edges: - from: ReceiveDocument to: ExtractText delivery: mode: direct packet: DocumentUpload - from: ExtractText to: ChunkDocument delivery: mode: checkpoint packet: ExtractedText - from: ChunkDocument to: EmbedChunks delivery: mode: checkpoint packet: DocumentChunks batchSize: 20 - from: EmbedChunks to: LlmSummarize delivery: mode: durable packet: EmbeddedChunks idempotencyKey: "{{payload.documentId}}-summarize" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S - from: LlmSummarize to: IndexDocument delivery: mode: durable packet: Summary idempotencyKey: "{{payload.documentId}}-index" ``` Note the switch from `checkpoint` to `durable` at the LLM step — the LLM call is expensive and non-deterministic, so it gets packet-level durability and idempotency, not just stage-level checkpointing. ## Performance considerations Each `checkpoint` edge write incurs a MongoDB write. This adds latency — typically 2–10ms per checkpoint — compared to `direct`'s microseconds. **When to use checkpoints:** - Multi-stage ETL pipelines where each stage takes seconds - Pipelines processing large documents or batches - Any pipeline where restarting from scratch is expensive **When NOT to use checkpoints:** - Short pipelines (< 3 stages) — overhead is not worth it - Real-time streaming where latency matters more than restart safety — use `direct` or `ephemeral` - Individual stages that are fast and cheap — batch them under a single checkpoint ## Checkpoint IDs The runtime generates a checkpoint ID from the flow ID, node ID, and packet ID: ```text {flowId}:{nodeId}:{packetId}:{timestamp} ``` These IDs are stored in MongoDB under the `{flowId}.checkpoints` collection and can be inspected via the runtime API. ## Summary - `checkpoint` edges save pipeline state to MongoDB after each node. - A failed pipeline resumes from the last checkpoint, not from the beginning. - Use `checkpoint` for expensive multi-stage pipelines; use `direct` for cheap transforms. - Switch to `durable` at LLM steps — they need packet-level guarantees, not just stage-level. ## Next steps - [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) — the full comparison of all five modes - [LLM Flows](https://flowdsl.com/docs/guides/llm-flows) — checkpoint + durable patterns for AI pipelines - [High-Throughput Pipelines](https://flowdsl.com/docs/guides/high-throughput-pipelines) — performance tuning # Delivery Modes Delivery modes are the most important concept in FlowDSL. Every edge in a flow declares a delivery mode that determines how packets travel from the source node to the destination node — including durability guarantees, transport infrastructure, replay behavior, and latency characteristics. The runtime enforces these guarantees. You declare the mode; the runtime handles the implementation. ## The five modes ### `direct` **In-process, synchronous delivery with zero durability.** The source node's output is passed directly to the destination node in the same process, with no intermediate storage. If the process crashes between the two nodes, the packet is lost. There is no queue, no retry, and no replay. **When to use it:** Cheap, deterministic transformations where speed matters more than durability. Validation steps, format conversions, field extractions, in-memory aggregations. **Guarantees:** | Property | Value | | ------------ | ------------ | | Durability | None | | Replay | No | | Restart-safe | No | | Latency | Microseconds | ```yaml edges: - from: ParseJson to: ValidateSchema delivery: mode: direct packet: RawPayload ``` ```mermaid flowchart LR A[ParseJson] -->|"direct\nin-process"| B[ValidateSchema] ``` **Real-world example:** A high-throughput log ingestion pipeline where each event is parsed and field-extracted before being passed to the next stage. Parsing is cheap and deterministic — losing an event in a crash is acceptable because the source will resend. --- ### `ephemeral` **Redis / NATS / RabbitMQ queue with low durability and built-in worker pool smoothing.** Packets are written to a Redis stream, NATS queue, or RabbitMQ queue. Worker processes consume at their own pace, providing natural backpressure and burst absorption. If the broker restarts without persistence, unconsumed messages are lost. There is no replay after consumption. **When to use it:** Medium-throughput steps that benefit from decoupling producer and consumer rates. Steps with variable processing time where you want to absorb traffic spikes without back-pressuring the upstream node. **Guarantees:** | Property | Value | | ------------ | --------------------------------------------- | | Durability | Low (Redis AOF optional) | | Replay | No (after consumption) | | Restart-safe | Partial (depends on Redis persistence config) | | Latency | Milliseconds | ```yaml edges: - from: IngestEvent to: EnrichPayload delivery: mode: ephemeral packet: RawEvent stream: enrich-queue maxLen: 100000 ``` ```mermaid flowchart LR A[IngestEvent] -->|"ephemeral\nRedis stream"| B[EnrichPayload] ``` **Real-world example:** A webhook receiver that ingests events from multiple sources at unpredictable rates and feeds a normalizer node. The Redis stream absorbs traffic spikes and allows the normalizer pool to drain at a steady pace. --- ### `checkpoint` **Mongo / Redis / Postgres backed pipeline state with stage-level durability and resume support.** After each node in a `checkpoint` chain completes, the runtime saves the output packet to the configured store (MongoDB, Redis, or Postgres). If a node or the runtime process fails, the pipeline can resume from the last successful checkpoint rather than restarting from the beginning. This is ideal for long multi-stage pipelines where reprocessing from the source is expensive. **When to use it:** Long ETL pipelines, multi-stage data processing, document transformation pipelines where each step is expensive and restarting from scratch is unacceptable. **Guarantees:** | Property | Value | | ------------ | ----------------------------------------------- | | Durability | Stage-level (last completed stage is persisted) | | Replay | Yes (from last checkpoint) | | Restart-safe | Yes | | Latency | Low to medium (MongoDB write per stage) | ```yaml edges: - from: ExtractText to: ChunkDocument delivery: mode: checkpoint packet: ExtractedText checkpointInterval: 1 - from: ChunkDocument to: EmbedChunks delivery: mode: checkpoint packet: DocumentChunks ``` ```mermaid flowchart LR A[ExtractText] -->|"checkpoint\nMongoDB snapshot"| B[ChunkDocument] B -->|"checkpoint\nMongoDB snapshot"| C[EmbedChunks] ``` **Real-world example:** A document intelligence pipeline that extracts text from PDFs, chunks it, embeds each chunk, and passes it to an LLM summarizer. Each stage is expensive. If the embedding step fails mid-way, the pipeline resumes from the last saved chunk batch rather than re-extracting and re-chunking. --- ### `durable` **Mongo / Postgres backed packet-level durability with guaranteed delivery and idempotency support.** Every packet is persisted to the configured store (MongoDB or Postgres) before delivery to the consumer. The consumer explicitly acknowledges receipt. If the consumer crashes before acknowledging, the packet is redelivered. This is the strongest delivery guarantee available short of `stream`. Combined with `idempotencyKey`, it provides exactly-once processing semantics. **When to use it:** Business-critical transitions where data loss is unacceptable — payments, SMS/email sends, external API calls, LLM invocations, support ticket creation. **Guarantees:** | Property | Value | | ------------ | ----------------------------------------------------- | | Durability | Packet-level (every packet persisted before delivery) | | Replay | Yes (until acknowledged) | | Restart-safe | Yes | | Latency | Low (MongoDB write, typically <5ms) | ```yaml edges: - from: LlmAnalyzer to: SendSmsAlert delivery: mode: durable packet: AlertPayload retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S maxDelay: PT30S idempotencyKey: "{{payload.alertId}}-sms" ``` ```mermaid flowchart LR A[LlmAnalyzer] -->|"durable\nMongoDB packet"| B[SendSmsAlert] ``` **Real-world example:** An email triage flow where the urgency classification result triggers an SMS alert to an on-call engineer. The SMS send must not be duplicated if the node retries, and must not be lost if the process crashes. `durable` with `idempotencyKey` guarantees exactly-once delivery. --- ### `stream` **Kafka / Redis / NATS durable event stream with fan-out and external consumer support.** Packets are published to a streaming backend (Kafka topic, Redis stream, or NATS JetStream). Multiple independent consumers can read from the same topic — internal FlowDSL nodes, external services, analytics systems, or audit logs. Messages are retained for a configurable period regardless of consumer state. This is the integration mode for connecting FlowDSL to the broader event-driven ecosystem. **When to use it:** External integration, event sourcing, fan-out to multiple consumers, audit trails, cross-team event contracts. **Guarantees:** | Property | Value | | ------------ | --------------------------------------------- | | Durability | Durable stream (Kafka retention policy) | | Replay | Yes (by offset reset) | | Restart-safe | Yes | | Latency | Low to medium (Kafka write, typically 5-20ms) | ```yaml edges: - from: ProcessOrder to: PublishOrderEvent delivery: mode: stream packet: OrderProcessed topic: orders.processed consumerGroup: fulfillment-workers ``` ```mermaid flowchart LR A[ProcessOrder] -->|"stream\nKafka topic"| B[PublishOrderEvent] B --> C[External consumers] B --> D[Analytics] B --> E[Audit log] ``` **Real-world example:** An order processing flow that publishes a processed order event to a Kafka topic. The fulfillment service, the analytics pipeline, and the audit log all consume from the same topic independently. Adding a new consumer requires no changes to the FlowDSL document. --- ## Comparison table | Mode | Transport | Durability | Replay | Restart-safe | Latency | Best for | | ------------ | ------------------------ | -------------- | ------ | ------------ | ------------ | --------------------------------------- | | `direct` | In-process | None | No | No | Microseconds | Cheap transforms, same-process steps | | `ephemeral` | Redis / NATS / RabbitMQ | Low | No | Partial | Milliseconds | Burst smoothing, worker pools | | `checkpoint` | Mongo / Redis / Postgres | Stage-level | Yes | Yes | Low–medium | Long multi-stage ETL pipelines | | `durable` | Mongo / Postgres | Packet-level | Yes | Yes | Low | Business-critical, LLM calls, payments | | `stream` | Kafka / Redis / NATS | Durable stream | Yes | Yes | Low–medium | External integration, fan-out, sourcing | ## Choosing a mode Not sure which mode to use? See [How to choose the right delivery mode](https://flowdsl.com/docs/guides/choosing-delivery-modes) for a decision tree that walks through the key questions. The short version: **when in doubt, use `durable`**. It has strong guarantees and reasonable performance. You can optimize to `direct` or `ephemeral` once you have real performance data. ## Next steps - [Edges](https://flowdsl.com/docs/concepts/edges) — how to declare edges with delivery policies - [Retry Policies](https://flowdsl.com/docs/concepts/retry-policies) — retrying failed deliveries - [Choosing Delivery Modes](https://flowdsl.com/docs/guides/choosing-delivery-modes) — practical decision guide # Edges An edge connects the output of one node to the input of another and declares the delivery policy that governs that connection. The delivery policy specifies the transport mode, the packet type, retry behavior, and idempotency semantics. The runtime reads the edge and establishes the corresponding infrastructure connection — a Redis stream, a MongoDB collection, or a Kafka topic — according to the declared mode. ## Edge structure ```yaml edges: - from: SourceNode # "NodeName" or "NodeName.outputPort" to: DestinationNode # "NodeName" or "NodeName.inputPort" delivery: mode: durable # The delivery mode (required) packet: MyPacket # Packet type reference (optional) retryPolicy: # Retry behavior (optional) maxAttempts: 3 backoff: exponential initialDelay: PT2S idempotencyKey: "{{payload.id}}-action" # Deduplication key (optional) ``` ### Fields | Field | Type | Required | Description | | ---------- | ------ | -------- | --------------------------------------------------- | | `from` | string | Yes | Source: `"NodeName"` or `"NodeName.outputPort"` | | `to` | string | Yes | Destination: `"NodeName"` or `"NodeName.inputPort"` | | `delivery` | object | Yes | The delivery policy object | | `when` | string | No | Condition expression for conditional edges | ## Simple edges A simple edge connects any output of the source to any input of the destination: ```yaml edges: - from: ParseJson to: ValidateFields delivery: mode: direct packet: RawPayload ``` When a node has a single input port and a single output port, omitting port names is idiomatic. ## Named port edges When a node has multiple outputs (like a router), edges must specify which output port to draw from: ```yaml nodes: RouteByPriority: operationId: route_by_priority kind: router outputs: urgent: { packet: EventPayload } normal: { packet: EventPayload } edges: - from: RouteByPriority.urgent to: UrgentHandler delivery: mode: durable packet: EventPayload - from: RouteByPriority.normal to: NormalHandler delivery: mode: ephemeral packet: EventPayload ``` The `.` syntax addresses a specific port: `NodeName.portName`. ## Conditional edges A `when` field adds a condition that the runtime evaluates against the packet before delivery. If the condition is false, the packet is not delivered on that edge: ```yaml edges: - from: ScoreLead to: AssignToSalesRep when: "payload.score >= 80" delivery: mode: durable packet: ScoredLead - from: ScoreLead to: AddToNurture when: "payload.score >= 40 && payload.score < 80" delivery: mode: durable packet: ScoredLead - from: ScoreLead to: ArchiveLead when: "payload.score < 40" delivery: mode: direct packet: ScoredLead ``` ::callout{type="info"} Conditional edges are evaluated by the runtime against the packet payload. The expression syntax is a simple JSONPath-style predicate. Use a `router` node instead for complex routing logic that requires code. :: ## Edges with retry policies ```yaml edges: - from: LlmAnalyzer to: CreateTicket delivery: mode: durable packet: AnalysisResult retryPolicy: maxAttempts: 5 backoff: exponential initialDelay: PT1S maxDelay: PT60S jitter: true ``` If `CreateTicket` throws an error, the runtime waits `initialDelay` before retrying, doubling each time up to `maxDelay`. After `maxAttempts` failures, the packet is moved to the dead letter queue. ## Idempotency on edges The `idempotencyKey` field on a delivery policy prevents duplicate processing when the runtime retries delivery after a crash between node execution and acknowledgment: ```yaml edges: - from: ChargePayment to: SendConfirmation delivery: mode: durable packet: PaymentResult idempotencyKey: "{{payload.orderId}}-confirm-email" ``` The runtime uses this key to deduplicate — if a packet with the same key is already acknowledged, the duplicate is silently dropped. The key template uses `{{payload.field}}` syntax and should be globally unique for the intended logical operation. ## Different delivery modes per edge It is common and encouraged to use different delivery modes on different edges within the same flow, matching the mode to the requirements of each step: ```mermaid flowchart LR A[IngestEvent] -->|"direct"| B[ParseFields] B -->|"ephemeral"| C[EnrichData] C -->|"checkpoint"| D[NormalizeSchema] D -->|"durable"| E[CallLlm] E -->|"stream"| F[PublishResult] ``` ```yaml edges: - from: IngestEvent to: ParseFields delivery: { mode: direct, packet: RawEvent } - from: ParseFields to: EnrichData delivery: { mode: ephemeral, packet: ParsedEvent, stream: enrich-q } - from: EnrichData to: NormalizeSchema delivery: { mode: checkpoint, packet: EnrichedEvent } - from: NormalizeSchema to: CallLlm delivery: mode: durable packet: NormalizedEvent idempotencyKey: "{{payload.eventId}}-llm" - from: CallLlm to: PublishResult delivery: { mode: stream, packet: LlmResult, topic: results.processed } ``` ## What happens when an edge fails 1. **Retry** — if a retry policy is configured, the runtime retries according to the backoff schedule. 2. **Dead letter** — after all retry attempts are exhausted, the packet is moved to a dead letter collection in MongoDB (for `durable`) or logged (for `direct`/`ephemeral`). 3. **Alert** — the runtime emits a metric and optionally triggers a dead letter alert if configured. Packets in the dead letter queue can be inspected, corrected, and re-injected via the runtime API. ## Summary - Every edge has a `from`, `to`, and `delivery` policy. - Named port syntax (`NodeName.portName`) addresses specific router outputs. - `when` conditions enable content-based routing without a dedicated router node. - `idempotencyKey` prevents duplicate processing on retry. - Different edges in the same flow should use different modes appropriate to each step. ## Next steps - [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) — the five modes explained in full - [Retry Policies](https://flowdsl.com/docs/concepts/retry-policies) — configuring retry behavior - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — writing safe idempotent nodes # Flows A FlowDSL flow is a **directed acyclic graph (DAG)** of nodes connected by edges. Each node performs a discrete unit of business logic. Each edge carries packets from one node to the next, governed by a delivery policy. The flow document is the authoritative description of this graph — the runtime reads it and handles all transport wiring. ## Top-level document structure ```yaml flowdsl: "1.0" # Spec version (required) info: # Document metadata (required) title: Order Fulfillment version: "2.1.0" description: Processes orders from receipt to confirmation externalDocs: # Optional links to AsyncAPI / OpenAPI docs url: https://api.mycompany.com/asyncapi.yaml description: AsyncAPI event contracts asyncapi: "./events.asyncapi.yaml" # Optional: path to AsyncAPI doc nodes: # Map of NodeName → Node definition (required) OrderReceived: ... ValidateOrder: ... edges: # Array of edge definitions (required) - from: OrderReceived to: ValidateOrder delivery: mode: direct components: # Reusable definitions (optional) packets: ... policies: ... ``` ### Top-level fields | Field | Type | Required | Description | | -------------- | ------ | -------- | ----------------------------------------------------------- | | `flowdsl` | string | Yes | Spec version. Currently `"1.0"`. | | `info` | object | Yes | Title, version, description, contact, license. | | `externalDocs` | object | No | URL to external documentation (AsyncAPI, OpenAPI). | | `asyncapi` | string | No | Path or URL to an AsyncAPI document for message references. | | `openapi` | string | No | Path or URL to an OpenAPI document. | | `nodes` | object | Yes | Map of PascalCase node name → Node object. | | `edges` | array | Yes | Array of Edge objects. | | `components` | object | No | Shared packets, policies, and node templates. | ## Flow topology A flow can have: - **Multiple entry points** — source nodes that have no incoming edges. Each source node is an independent entry point that triggers when its input event arrives. - **Multiple terminal nodes** — nodes with no outgoing edges. A flow completes when all active paths reach a terminal node. - **Branching** — a router node with multiple named outputs sends packets down different paths based on content. - **Merging** — a node can receive inputs from multiple edges (uncommon; the runtime processes them independently). ```mermaid flowchart TD A[OrderReceived\nsource] --> B[ValidateOrder\ntransform] B --> C{RouteByPriority\nrouter} C -->|priority_high| D[ExpressProcess\naction] C -->|priority_normal| E[StandardProcess\naction] D --> F[SendConfirmation\naction] E --> F ``` ## Flow lifecycle ```mermaid flowchart LR A[Write .flowdsl.yaml] --> B[Validate against schema] B --> C[Deploy to runtime] C --> D[Runtime resolves nodes] D --> E[Runtime establishes connections] E --> F[Flow active: processing events] F --> G[Monitor via Studio] ``` 1. **Write** — author the flow in YAML or JSON. 2. **Validate** — run `flowdsl validate my-flow.flowdsl.yaml` or use Studio's Validate button. The runtime rejects invalid documents at startup. 3. **Deploy** — load the document into the runtime (file path, S3 URL, or API). 4. **Resolve** — the runtime contacts each node's address (from the node registry or `node-registry.yaml`) and verifies it is available. 5. **Connect** — the runtime establishes delivery connections for each edge (Redis streams, MongoDB collections, Kafka topics) according to the declared delivery modes. 6. **Active** — the flow processes events. Each event that arrives at a source node is a fresh execution context. ## Naming conventions | Element | Convention | Example | | --------------- | ------------ | ---------------------------------- | | Flow ID | `snake_case` | `order_fulfillment_v2` | | Node names | `PascalCase` | `OrderReceived`, `ValidatePayment` | | `operationId` | `snake_case` | `validate_payment_amount` | | Component names | `PascalCase` | `OrderPayload`, `PaymentResult` | ## A complete flow example ```yaml flowdsl: "1.0" info: title: Order Fulfillment version: "1.0.0" description: End-to-end order processing from receipt to customer confirmation nodes: OrderReceived: operationId: receive_order kind: source summary: Entry point — receives new order events from the API gateway outputs: out: packet: OrderPayload ValidateOrder: operationId: validate_order kind: transform summary: Validates order fields and computes totals inputs: in: packet: OrderPayload outputs: out: packet: ValidatedOrder ChargePayment: operationId: charge_payment kind: action summary: Charges the payment method via Stripe inputs: in: packet: ValidatedOrder outputs: out: packet: PaymentResult SendConfirmation: operationId: send_confirmation_email kind: action summary: Sends order confirmation email to customer inputs: in: packet: PaymentResult edges: - from: OrderReceived to: ValidateOrder delivery: mode: direct packet: OrderPayload - from: ValidateOrder to: ChargePayment delivery: mode: durable packet: ValidatedOrder retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S - from: ChargePayment to: SendConfirmation delivery: mode: durable packet: PaymentResult idempotencyKey: "{{payload.orderId}}-confirm" components: packets: OrderPayload: type: object properties: orderId: { type: string } customerId: { type: string } items: { type: array } total: { type: number } required: [orderId, customerId, items, total] ValidatedOrder: type: object properties: orderId: { type: string } customerId: { type: string } total: { type: number } tax: { type: number } currency: { type: string } required: [orderId, customerId, total, currency] PaymentResult: type: object properties: orderId: { type: string } chargeId: { type: string } amount: { type: number } status: { type: string, enum: [succeeded, failed] } required: [orderId, chargeId, status] ``` ## Summary - A flow is a DAG of nodes connected by edges, declared in a single YAML or JSON file. - The file is the source of truth — the runtime is an implementation of what the file declares. - Flows can have multiple entry points, branching, and multiple terminal paths. - Node names are `PascalCase`; `operationId` values are `snake_case`. ## Next steps - [Nodes](https://flowdsl.com/docs/concepts/nodes) — the nine node kinds in detail - [Edges](https://flowdsl.com/docs/concepts/edges) — delivery policies and edge configuration - [Your First Flow](https://flowdsl.com/docs/tutorials/your-first-flow) — build a complete flow step by step # Concepts This section covers the core vocabulary and mechanics of the FlowDSL specification. If you are new, read [What is FlowDSL?](https://flowdsl.com/docs/concepts/what-is-flowdsl) first, then work through the pages in order. If you know the basics, jump directly to the concept you need. ## Pages in this section ### [What is FlowDSL?](https://flowdsl.com/docs/concepts/what-is-flowdsl) FlowDSL as a specification — where it sits in the API ecosystem alongside OpenAPI and AsyncAPI, the four layers of a FlowDSL system, and how a document moves from definition to execution. ### [Flows](https://flowdsl.com/docs/concepts/flows) A flow is a directed acyclic graph of nodes connected by edges. Covers the top-level document structure, flow lifecycle, and how the runtime loads and runs flows. ### [Nodes](https://flowdsl.com/docs/concepts/nodes) Nodes are the units of business logic. Covers the nine node kinds, node structure, the bilateral contract model, and the node manifest format. ### [Edges](https://flowdsl.com/docs/concepts/edges) Edges connect nodes and carry delivery policies. Covers edge structure, named port addressing, conditional routing, and failure behavior. ### [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) The five delivery modes — `direct`, `ephemeral`, `checkpoint`, `durable`, `stream` — are the most important concept in FlowDSL. Each mode has distinct durability, latency, and replay guarantees. ### [Packets](https://flowdsl.com/docs/concepts/packets) Packets are typed schemas for the data flowing along edges. Can be defined natively in `components.packets` or referenced from an AsyncAPI document. ### [Retry Policies](https://flowdsl.com/docs/concepts/retry-policies) Configure automatic retry behavior for failed edge deliveries, with fixed, linear, and exponential backoff support. ### [Checkpoints](https://flowdsl.com/docs/concepts/checkpoints) The `checkpoint` delivery mode snapshots pipeline state to MongoDB so the runtime can resume from the last successful stage after a failure. ### [Node Registry](https://flowdsl.com/docs/concepts/node-registry) Where node implementations are published and discovered. Covers `repo.flowdsl.com`, local `node-registry.yaml`, and node resolution. --- > **Core principle:** Nodes define business logic. Edges define delivery semantics. The runtime enforces guarantees. # Node Registry The node registry is where FlowDSL node implementations are published, versioned, and discovered. When the runtime loads a flow document, it looks up each node's `operationId` in the registry to find the implementation's network address and connect to it. ## How node resolution works ```mermaid flowchart LR A["Flow document\noperationId: llm_classify_email"] --> B[Runtime] B --> C[Local node-registry.yaml] C -->|"found: localhost:8082"| B B -->|"connect"| D["LLM node\nrunning on :8082"] ``` 1. The runtime reads the flow document and collects all `operationId` values. 2. For each ID, it queries `node-registry.yaml` (local) or `repo.flowdsl.com` (remote). 3. The registry returns the node's address and version. 4. The runtime establishes a gRPC or HTTP connection to the node process. 5. When a packet arrives on an edge, the runtime calls the node's `Handle` method. ## node-registry.yaml For local development, define a `node-registry.yaml` file in your project: ```yaml nodes: receive_order: address: localhost:8080 version: "1.0.0" runtime: go validate_order: address: localhost:8081 version: "1.0.0" runtime: go llm_classify_email: address: localhost:8082 version: "2.1.0" runtime: python send_sms_alert: address: localhost:8083 version: "1.3.0" runtime: go charge_payment: address: payment-service.internal:9090 version: "3.0.0" runtime: go ``` Set the registry file path via environment variable: ```bash FLOWDSL_REGISTRY_FILE=./node-registry.yaml flowdsl-runtime start my-flow.flowdsl.yaml ``` ## repo.flowdsl.com (coming soon) The public node registry at `repo.flowdsl.com` will allow developers to publish and discover FlowDSL nodes. Published nodes are resolved by `operationId` and version. The runtime fetches the node's network endpoint from the registry automatically. ```bash # Publishing a node (coming soon) flowdsl publish --manifest flowdsl-node.json # Specifying a registry in your flow registry: https://repo.flowdsl.com ``` ## The flowdsl-node.json manifest Every node implementation ships a `flowdsl-node.json` manifest. This is the node's identity document — it describes the node's contract and metadata: ```json { "operationId": "llm_classify_email", "name": "LLM Email Classifier", "version": "2.1.0", "description": "Classifies emails as urgent, normal, or spam using an LLM", "runtime": "python", "inputs": [ { "name": "in", "packet": "EmailPayload", "description": "The email to classify" } ], "outputs": [ { "name": "out", "packet": "ClassifiedEmail", "description": "Email with classification and confidence score" } ], "settings": { "type": "object", "properties": { "model": { "type": "string", "default": "gpt-4o-mini" }, "temperature": { "type": "number", "default": 0.1 } } }, "repository": "https://github.com/myorg/flowdsl-nodes", "author": "My Team", "license": "Apache-2.0", "tags": ["llm", "email", "classification"] } ``` ## Local vs remote nodes | Mode | When to use | Address | | ---------- | ------------------------- | --------------------------------- | | **Local** | Development, testing | `localhost:PORT` | | **Docker** | Local multi-service setup | `node-name:PORT` (Docker network) | | **Remote** | Production | `node-service.namespace.svc:PORT` | The runtime doesn't care whether a node is local, in Docker, or across a network — it connects to whatever address the registry provides. ## Node versioning The runtime can enforce version constraints: ```yaml # node-registry.yaml nodes: llm_classify_email: address: localhost:8082 version: "2.1.0" minVersion: "2.0.0" # Reject older versions ``` The node's `flowdsl-node.json` declares its own version, and the runtime rejects nodes that don't meet the minimum version constraint. ## Summary - `node-registry.yaml` maps `operationId` to network addresses for local development. - `repo.flowdsl.com` is the upcoming public registry for published nodes. - Every node ships a `flowdsl-node.json` manifest describing its contract. - The runtime connects to nodes at startup and calls them on packet arrival. ## Next steps - [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) — implement a node and write its manifest - [Node Manifest reference](https://flowdsl.com/docs/reference/node-manifest) — full manifest field reference - [Node Development](https://flowdsl.com/docs/guides/node-development) — building and publishing nodes # Nodes A node is the unit of business logic in a FlowDSL flow. It has a clearly defined **contract** — named input ports and named output ports, each carrying a specific packet type. The runtime calls the node with the input packet and routes the output packet to the next node according to the edge's delivery policy. Nodes are stateless with respect to transport — they never touch Kafka, Redis, or MongoDB directly. ## Node kinds FlowDSL defines nine node kinds that describe a node's role in the flow: | Kind | Role | Typical use | | ------------- | --------------------------------------------------------------- | ----------------------------------------------------- | | `source` | Entry point — no inputs, only outputs | Webhook receiver, event consumer, scheduler | | `transform` | Maps input to output with the same or different schema | Field extraction, format conversion, computation | | `router` | Routes packets to one of several named outputs based on content | Priority routing, conditional branching, A/B split | | `llm` | Calls a language model | Classification, summarization, extraction, generation | | `action` | Performs a side effect in an external system | Send email, charge payment, create ticket, call API | | `checkpoint` | Saves pipeline state and passes through | Resumable pipeline stage marker | | `publish` | Publishes to an event bus or message broker | Emit to Kafka, push to webhook | | `terminal` | End of a path — no outputs | Archive, discard, log final result | | `integration` | Bridges to an external FlowDSL flow | Cross-flow composition | ## Node structure ```yaml nodes: FilterByPriority: operationId: filter_by_priority # snake_case, matches the handler function kind: router summary: Routes events by priority level description: | Reads the priority field from the incoming payload and routes to urgent_out for P0/P1 events, or normal_out for all others. inputs: in: packet: EventPayload description: Incoming event to classify outputs: urgent_out: packet: EventPayload description: P0 and P1 events normal_out: packet: EventPayload description: P2 and below events settings: urgentPriorities: [P0, P1] x-ui: position: { x: 320, y: 180 } color: "#7c3aed" icon: filter ``` ### Fields | Field | Type | Required | Description | | ------------- | ------ | -------- | --------------------------------------------------------------------------------------- | | `operationId` | string | Yes | Unique `snake_case` identifier. Maps to the handler function registered in the runtime. | | `kind` | string | Yes | One of the nine node kinds. | | `summary` | string | No | One-line description for Studio and documentation. | | `description` | string | No | Longer markdown description. | | `inputs` | object | No | Map of port name → Port object. | | `outputs` | object | No | Map of port name → Port object. | | `settings` | object | No | Static configuration passed to the handler at initialization. | | `x-ui` | object | No | Canvas layout hints for Studio (position, color, icon). | ### Port object ```yaml inputs: in: packet: EmailPayload # Reference to components.packets or asyncapi#/... description: The email to analyze ``` A port has a `packet` (packet type reference) and an optional `description`. ## The bilateral contract The visual representation of a node in Studio and on the spec page is a **bilateral contract card** — a dark card showing the node's input ports on the left and output ports on the right. This makes the node's contract immediately readable: what goes in, what comes out, and what types are involved. This is unique to FlowDSL. OpenAPI shows endpoints; AsyncAPI shows channels; FlowDSL shows executable bilateral contracts. ```text ┌──────────────────────────────────────────────────────┐ │ [transform] transform_order_fields │ │ TransformOrder — Extracts and normalizes order data │ ├────────────────────────┬─────────────────────────────┤ │ INPUTS │ OUTPUTS │ │ │ │ │ in OrderPayload ───►│►─── out NormalizedOrder │ └────────────────────────┴─────────────────────────────┘ ``` ## Node examples by kind ### source ```yaml OrderReceived: operationId: receive_order kind: source summary: Receives new order events outputs: out: packet: OrderPayload ``` ### transform ```yaml NormalizeOrder: operationId: normalize_order_fields kind: transform summary: Normalizes currency and address fields inputs: in: { packet: RawOrder } outputs: out: { packet: NormalizedOrder } ``` ### router ```yaml RouteByStatus: operationId: route_order_by_status kind: router summary: Routes orders to the correct processing path inputs: in: { packet: Order } outputs: approved: { packet: Order } pending_review: { packet: Order } rejected: { packet: Order } ``` ### llm ```yaml ClassifyEmail: operationId: llm_classify_email kind: llm summary: Classifies email as urgent, normal, or spam inputs: in: { packet: EmailPayload } outputs: out: { packet: ClassifiedEmail } settings: model: gpt-4o-mini systemPrompt: "Classify this email as: urgent, normal, or spam. Return JSON." temperature: 0.1 ``` ### action ```yaml SendSmsAlert: operationId: send_sms_alert kind: action summary: Sends an SMS alert via Twilio inputs: in: { packet: AlertPayload } outputs: out: { packet: SmsResult } ``` ### terminal ```yaml ArchiveSpam: operationId: archive_spam_email kind: terminal summary: Archives the email in the spam folder inputs: in: { packet: ClassifiedEmail } ``` ## Nodes must not own transport semantics A node handler should never call Kafka, open a MongoDB connection, or write to Redis directly. Those are the runtime's responsibility. The node receives its input packet, does its computation or side-effect, and returns its output packet. This constraint is what makes nodes portable and independently testable. ```go // CORRECT: node knows nothing about delivery func (n *FilterNode) Handle(ctx context.Context, input flowdsl.NodeInput) (flowdsl.NodeOutput, error) { payload, _ := input.Packet("in") if payload.GetString("priority") == "urgent" { return flowdsl.NodeOutput{}.Send("urgent_out", payload), nil } return flowdsl.NodeOutput{}.Send("normal_out", payload), nil } // WRONG: node writing directly to Kafka func (n *FilterNode) Handle(ctx context.Context, input flowdsl.NodeInput) (flowdsl.NodeOutput, error) { // DO NOT do this — this is the runtime's job producer.Produce("urgent-topic", payload) } ``` ## The flowdsl-node.json manifest Every node implementation ships a `flowdsl-node.json` manifest that describes it to the registry: ```json { "operationId": "filter_by_priority", "name": "Filter by Priority", "version": "1.2.0", "description": "Routes events to different outputs based on priority level", "runtime": "go", "inputs": [ { "name": "in", "packet": "EventPayload", "description": "Incoming event" } ], "outputs": [ { "name": "urgent_out", "packet": "EventPayload", "description": "P0/P1 events" }, { "name": "normal_out", "packet": "EventPayload", "description": "P2+ events" } ], "settings": { "type": "object", "properties": { "urgentPriorities": { "type": "array", "items": { "type": "string" } } } }, "repository": "https://github.com/myorg/flowdsl-nodes", "author": "My Team", "license": "Apache-2.0", "tags": ["routing", "priority"] } ``` ## Summary - Nodes declare typed input and output ports — the bilateral contract. - Nine kinds cover every role: source, transform, router, llm, action, checkpoint, publish, terminal, integration. - Nodes must not own transport semantics — the runtime handles delivery. - `operationId` is `snake_case`; node names are `PascalCase`. ## Next steps - [Edges](https://flowdsl.com/docs/concepts/edges) — connecting nodes with delivery policies - [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) — implement a node using the Go SDK - [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — implement a node using the Python SDK # Packets A packet is a typed schema that describes the data traveling along an edge. Every edge optionally declares a `packet` type in its delivery policy — the runtime uses this to validate the shape of data passing between nodes and to generate documentation in Studio. ## Native packets Define packets directly in your FlowDSL document under `components.packets`. Each packet is a JSON Schema Draft-07 object: ```yaml components: packets: OrderPayload: type: object properties: orderId: type: string description: Unique order identifier customerId: type: string items: type: array items: type: object properties: sku: { type: string } qty: { type: integer, minimum: 1 } price: { type: number } required: [sku, qty, price] total: type: number minimum: 0 currency: type: string enum: [USD, EUR, GBP] required: [orderId, customerId, items, total, currency] PaymentResult: type: object properties: orderId: { type: string } chargeId: { type: string } status: type: string enum: [succeeded, failed, pending] amount: { type: number } required: [orderId, chargeId, status] ``` Reference a packet from an edge: ```yaml edges: - from: ValidateOrder to: ChargePayment delivery: mode: durable packet: OrderPayload # References components.packets.OrderPayload ``` ## AsyncAPI-referenced packets When you have an existing AsyncAPI document, reference its message schemas directly instead of duplicating them: ```yaml # In your FlowDSL document asyncapi: "./events.asyncapi.yaml" edges: - from: OrderReceived to: ProcessOrder delivery: mode: durable packet: "asyncapi#/components/messages/OrderPlaced" ``` The runtime resolves the reference by loading the AsyncAPI document and extracting the message schema at the given JSON Pointer path. The packet is validated at runtime against the resolved schema. ## Packet naming | Convention | Correct | Incorrect | | ------------- | --------------------------------- | ------------------------------- | | PascalCase | `OrderPayload`, `EmailMessage` | `orderPayload`, `email_message` | | Descriptive | `ClassifiedEmail` | `Payload`, `Data` | | Role-specific | `SmsAlertInput`, `SmsAlertOutput` | `SmsPayload` (ambiguous) | ## Validation The runtime validates packets at each edge: - **At startup:** Verifies that all referenced packet names exist in `components.packets` or can be resolved from the referenced AsyncAPI document. - **At runtime:** Validates each packet against its JSON Schema before delivery. Invalid packets are rejected and moved to the dead letter queue. ## When to define packets Define a packet when: - Multiple edges share the same schema (reuse the name) - The schema is complex enough to benefit from a named definition - You want Studio to show the packet structure in the NodeContractCard Omit the `packet` field when: - The edge is using `direct` mode between two nodes you control and schema validation is handled inside the node - You are early in development and the schema is still evolving ## Summary - Packets are JSON Schema Draft-07 objects defined under `components.packets`. - Reference them by name on edge delivery policies. - Or reference AsyncAPI messages using `asyncapi#/components/messages/MessageName`. - PascalCase naming convention for all packet names. ## Next steps - [Edges](https://flowdsl.com/docs/concepts/edges) — how packets are used on edges - [Connecting AsyncAPI](https://flowdsl.com/docs/tutorials/connecting-asyncapi) — referencing AsyncAPI schemas - [Components reference](https://flowdsl.com/docs/reference/spec/components) — full components section reference # Retry Policies A retry policy configures what the runtime does when a node handler throws an error or times out. It lives on the edge's delivery policy, not on the node — consistent with the principle that delivery semantics belong to edges, not nodes. Retry policies only apply to `durable` and `ephemeral` delivery modes. `direct` edges propagate errors immediately. `stream` edges rely on Kafka's consumer group retry mechanisms. ## RetryPolicy structure ```yaml retryPolicy: maxAttempts: 5 # Total attempts including the first (required) backoff: exponential # "fixed" | "linear" | "exponential" (required) initialDelay: PT2S # ISO 8601 duration (required) maxDelay: PT60S # Cap on backoff delay (optional) jitter: true # Add random ±20% jitter to backoff (optional, default: false) retryOn: # Error codes to retry on (optional, default: all) - TIMEOUT - RATE_LIMITED - TEMPORARY_FAILURE ``` ### Fields | Field | Type | Required | Default | Description | | -------------- | ----------------- | -------- | ---------- | --------------------------------------------- | | `maxAttempts` | integer (1–10) | Yes | — | Total delivery attempts including the first. | | `backoff` | string | Yes | — | `"fixed"`, `"linear"`, or `"exponential"` | | `initialDelay` | ISO 8601 duration | Yes | — | Delay before the first retry. | | `maxDelay` | ISO 8601 duration | No | Unlimited | Maximum delay between retries. | | `jitter` | boolean | No | `false` | Adds random variance to prevent retry storms. | | `retryOn` | array of string | No | All errors | Limit retries to specific error codes. | ## Backoff strategies ### Fixed Every retry waits the same `initialDelay`: ```text Attempt 1 → fail → wait 5s → Attempt 2 → fail → wait 5s → Attempt 3 ``` ```yaml retryPolicy: maxAttempts: 3 backoff: fixed initialDelay: PT5S ``` **Use for:** Short, deterministic operations where you want predictable retry timing. ### Linear Each retry adds one `initialDelay` to the previous wait: ```text Attempt 1 → fail → wait 2s → Attempt 2 → fail → wait 4s → Attempt 3 → fail → wait 6s ``` ```yaml retryPolicy: maxAttempts: 4 backoff: linear initialDelay: PT2S maxDelay: PT30S ``` **Use for:** Operations that may need a bit more time with each retry. ### Exponential The wait doubles each time, up to `maxDelay`: ```text Attempt 1 → fail → wait 1s → Attempt 2 → fail → wait 2s → Attempt 3 → fail → wait 4s → Attempt 4 → fail → wait 8s ``` ```yaml retryPolicy: maxAttempts: 5 backoff: exponential initialDelay: PT1S maxDelay: PT60S jitter: true ``` **Use for:** External API calls, LLM invocations, network-dependent operations. Exponential backoff with jitter is the standard choice for avoiding retry storms against rate-limited services. ## Complete examples ### SMS alert with exponential backoff ```yaml edges: - from: ClassifyUrgent to: SendSmsAlert delivery: mode: durable packet: AlertPayload idempotencyKey: "{{payload.alertId}}-sms" retryPolicy: maxAttempts: 4 backoff: exponential initialDelay: PT2S maxDelay: PT30S jitter: true ``` ### Payment charge with fixed retry ```yaml edges: - from: ValidateOrder to: ChargePayment delivery: mode: durable packet: ValidatedOrder idempotencyKey: "{{payload.orderId}}-charge" retryPolicy: maxAttempts: 3 backoff: fixed initialDelay: PT5S retryOn: [TIMEOUT, NETWORK_ERROR] ``` ### LLM call with long exponential backoff ```yaml edges: - from: PreparePrompt to: LlmSummarize delivery: mode: durable packet: PromptPayload idempotencyKey: "{{payload.documentId}}-summarize" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S maxDelay: PT120S retryOn: [RATE_LIMITED, TIMEOUT] ``` ## Dead letter behavior After all retry attempts are exhausted, the packet moves to the **dead letter queue** — a MongoDB collection named `{flowId}.dead_letters`. The dead letter record includes: - The original packet - The error from the last attempt - The number of attempts made - A timestamp for each attempt Packets in the dead letter queue can be re-injected via the runtime API after fixing the underlying issue. ::callout{type="warning"} **Idempotency required with retries.** If a node partially completes before failing (e.g., an email is sent but the handler crashes before returning), retrying will call the node again. Always pair `durable` retry policies with an `idempotencyKey` to prevent duplicate side effects. :: ## Summary - Retry policies live on edges, not nodes. - Three strategies: `fixed`, `linear`, `exponential`. - `exponential` with `jitter: true` is the safe default for external calls. - Always add `idempotencyKey` when using retry policies on nodes with side effects. ## Next steps - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — writing safe idempotent node handlers - [Error Handling](https://flowdsl.com/docs/guides/error-handling) — dead letters and recovery patterns - [RetryPolicy reference](https://flowdsl.com/docs/reference/spec/retry-policy) — field-by-field reference # What is FlowDSL? FlowDSL is an open specification for describing and executing event-driven flow graphs. A FlowDSL document is the single source of truth for how data moves through a system — which nodes process it, how it travels between them, and what delivery guarantees apply at each step. The runtime reads the document and enforces those guarantees; your application code never touches Kafka consumer groups or MongoDB retry logic directly. ## Where FlowDSL fits There are three open specifications that together describe a modern event-driven system: ```mermaid graph LR A[HTTP Request/Response] -->|described by| B[OpenAPI] C[Events and Messages] -->|described by| D[AsyncAPI] E[Flow Graphs] -->|described by| F[FlowDSL] D -->|optionally referenced by| F B -->|optionally referenced by| F ``` - **OpenAPI** describes the shape of HTTP APIs: paths, parameters, request/response schemas. - **AsyncAPI** describes event and message contracts: topics, channels, message schemas. - **FlowDSL** describes executable flow graphs: which nodes run, how they connect, and what delivery semantics apply to each connection. FlowDSL is **fully self-contained**. You do not need an AsyncAPI or OpenAPI document to write a working FlowDSL flow. Packets and events are defined natively in the `components` section. AsyncAPI and OpenAPI are optional integrations — useful when you already have those contracts and want to reference them directly rather than duplicating schema definitions. ## The four layers A FlowDSL system has four distinct layers, each with a clear responsibility: | Layer | What it is | Who owns it | | ------------------- | ----------------------------------------------------------------- | -------------------------------- | | **Contract** | The `.flowdsl.yaml` document | You (version-controlled in git) | | **Flow Definition** | Nodes and edges declared in the document | You (declarative YAML/JSON) | | **Runtime** | The process that reads the document and wires everything together | flowdsl-go or flowdsl-py runtime | | **Storage** | MongoDB, Redis, Kafka — the backing stores for delivery modes | Your infrastructure | Your job is to write the Contract. The Runtime's job is to implement everything the Contract declares. ## From definition to execution ```mermaid flowchart LR A["Developer writes\n.flowdsl.yaml"] --> B["Studio\nvalidates"] B --> C["Runtime\nreads document"] C --> D["Nodes\nexecute"] D --> E["Results published\nto Kafka / MongoDB"] ``` 1. You write a `.flowdsl.yaml` (or `.flowdsl.json`) file. 2. Studio (or the CLI) validates it against the JSON Schema at `https://flowdsl.com/schemas/v1/flowdsl.schema.json`. 3. The runtime reads the document, resolves node addresses, and establishes delivery connections. 4. When events arrive, nodes execute in the declared order. 5. Results are persisted or published according to the delivery mode on each edge. ## File formats FlowDSL documents can be written in either format: | Format | Extension | When to use | | ------ | --------------- | --------------------------------------------------------- | | YAML | `.flowdsl.yaml` | Human authoring, version control, code review | | JSON | `.flowdsl.json` | Programmatic generation, API payloads, machine processing | JSON is the canonical format that the runtime loads. The CLI converts YAML to JSON during the build step. Both formats are schema-validated identically. ## A minimal FlowDSL document ```yaml flowdsl: "1.0" info: title: User Signup Notification version: "1.0.0" description: Sends a welcome email when a user signs up nodes: UserSignedUp: operationId: receive_signup kind: source summary: Receives new user signup events SendWelcomeEmail: operationId: send_welcome_email kind: action summary: Sends the welcome email via email provider edges: - from: UserSignedUp to: SendWelcomeEmail delivery: mode: durable packet: SignupPayload components: packets: SignupPayload: type: object properties: userId: type: string email: type: string format: email name: type: string required: [userId, email, name] ``` This is a complete, valid FlowDSL document. It declares two nodes, one edge between them using `durable` delivery (guaranteed, packet-level durability backed by MongoDB), and the packet schema for the data traveling along that edge. ## What FlowDSL is not - **Not a workflow engine** — FlowDSL is a specification. The runtime is a separate implementation (flowdsl-go, flowdsl-py) that reads the spec. You can swap runtimes without changing your document. - **Not an AsyncAPI extension** — FlowDSL is a peer specification, not a layer on top of AsyncAPI. AsyncAPI integration is optional. - **Not a visual-first tool** — Studio is a convenience for visualizing and editing flows. The YAML file is always the source of truth. ## Summary - FlowDSL is a specification (JSON Schema + rules), not a product. - A FlowDSL document is a directed graph of nodes connected by edges with explicit delivery modes. - The document is the source of truth; the runtime implements what it declares. - FlowDSL is self-contained but can optionally reference AsyncAPI and OpenAPI schemas. ## Next steps - [Flows](https://flowdsl.com/docs/concepts/flows) — the structure of a FlowDSL document in detail - [Nodes](https://flowdsl.com/docs/concepts/nodes) — the nine node kinds and how to define them - [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) — the most important concept in the spec # Delivery Modes The `delivery.mode` field on an edge controls the transport layer and durability guarantee. ## `direct` In-process function call. No broker, no serialization. Fastest path. - **Transport:** in-process - **Durability:** none - **Best for:** fast local transforms within the same process ### Protocol resolution When both nodes share the same language runtime (e.g. Go → Go), `direct` is a true in-process call. When the source and target run in different languages (e.g. Go → Python), the runtime transparently upgrades the call to gRPC while keeping the `direct` delivery semantics (no broker, no durability). | Source lang | Target lang | Actual transport | | ----------- | ----------- | -------------------------- | | Same | Same | In-process function call | | Different | Different | gRPC (transparent upgrade) | See [Communication Protocols](https://flowdsl.com/docs/reference/grpc-protocol) for all supported protocols. ## `ephemeral` Redis / NATS / RabbitMQ queue. Survives brief spikes but not process restarts. - **Transport:** Redis / NATS / RabbitMQ - **Durability:** low - **Best for:** burst smoothing, rate-limiting ## `checkpoint` Mongo / Redis / Postgres backed. Progress is saved at each stage, enabling replay from any point. - **Transport:** Mongo / Redis / Postgres - **Durability:** stage-level - **Best for:** high-throughput pipelines that need replay ## `durable` Mongo / Postgres backed, packet-level acknowledgement. Every message is persisted before processing begins. - **Transport:** Mongo / Postgres - **Durability:** packet-level - **Best for:** business-critical steps (payments, order fulfilment) ## `stream` Kafka / Redis / NATS durable stream. Supports fan-out to multiple consumers and external integration. - **Transport:** Kafka / Redis / NATS - **Durability:** durable stream - **Best for:** external integration, fan-out, audit logging ## Choosing a mode | Scenario | Recommended mode | | --------------------------- | ---------------- | | In-process transform | `direct` | | Queue spikes, low stakes | `ephemeral` | | Long pipeline, need replay | `checkpoint` | | Money, orders, legal | `durable` | | External consumers, fan-out | `stream` | ## Delivery mode vs communication protocol Delivery mode and communication protocol are two separate concerns: - **Delivery mode** (on an edge) defines *how packets flow between nodes* — the durability guarantees and buffering strategy. - **Communication protocol** (on an edge) defines *the wire protocol for a specific connection* — gRPC, NATS, Redis, etc. Nodes declare which protocols they support via `runtime.supports`. They compose independently. An edge using NATS as its protocol can still have `durable` delivery mode — the runtime handles the translation between the edge's delivery transport and the connection's wire protocol. See [Communication Protocols](https://flowdsl.com/docs/reference/grpc-protocol) for all 9 supported protocols. # Getting Started FlowDSL describes executable event-driven flow graphs. A flow is a directed graph of **nodes** connected by **edges** with explicit delivery semantics. ## Prerequisites - An AsyncAPI document describing your event schemas - The FlowDSL Go or Python SDK installed ## Your first flow Create a file called `hello.flowdsl.yaml`: ```yaml flowdsl: "1.0" info: title: Hello FlowDSL version: "1.0.0" asyncapi: "./events.asyncapi.yaml" nodes: Ingest: operationId: ingest_event description: Receives the incoming event Process: operationId: process_event description: Applies business logic Emit: operationId: emit_result description: Publishes the result edges: - from: Ingest to: Process delivery: mode: direct packet: "asyncapi#/components/messages/RawEvent" - from: Process to: Emit delivery: mode: durable packet: "asyncapi#/components/messages/ProcessedEvent" ``` ## Key rules - Node names use `PascalCase` - `operationId` values use `snake_case` - Delivery policy lives on the **edge**, not the node - AsyncAPI messages are referenced, never duplicated ## Next steps - [Core Concepts](https://flowdsl.com/docs/concepts) - [Delivery Modes](https://flowdsl.com/docs/delivery-modes) - [AsyncAPI Integration](https://flowdsl.com/docs/asyncapi) # AsyncAPI ↔ FlowDSL Integration FlowDSL is self-contained but provides first-class support for AsyncAPI integration. If your team maintains AsyncAPI documents describing your event bus, you can reference those schemas directly in FlowDSL instead of duplicating them. ## When to use AsyncAPI integration Use AsyncAPI references when: - Your team already maintains AsyncAPI documents for your event bus - The same event schemas are consumed by multiple systems (not just FlowDSL) - You want AsyncAPI to remain the single source of truth for event contracts Use native FlowDSL packets when: - This flow is the only consumer of these packet schemas - The schemas are internal to the flow and not published to other teams - You are early in development and want to iterate quickly ## Setting up the integration ### 1. Link the AsyncAPI document ```yaml flowdsl: "1.0" info: title: Order Processing version: "1.0.0" # Path or URL to the AsyncAPI document asyncapi: "./events.asyncapi.yaml" externalDocs: url: https://github.com/myorg/event-schemas/blob/main/asyncapi.yaml description: AsyncAPI event schema definitions (v2.6) ``` ### 2. Reference AsyncAPI messages ```yaml nodes: OrderReceived: operationId: receive_order kind: source outputs: out: packet: "asyncapi#/components/messages/OrderPlaced" edges: - from: OrderReceived to: ValidateOrder delivery: mode: durable packet: "asyncapi#/components/messages/OrderPlaced" ``` ### 3. Mix with native packets ```yaml components: packets: # Internal intermediate packet — not in AsyncAPI ValidationResult: type: object properties: orderId: { type: string } isValid: { type: boolean } errors: { type: array, items: { type: string } } required: [orderId, isValid] edges: - from: ValidateOrder to: ChargePayment delivery: packet: ValidationResult # Native packet ``` ## Runtime resolution At startup, the runtime: 1. Reads the `asyncapi` field and loads the document (local file or HTTP URL) 2. For each `asyncapi#/...` reference, extracts the `payload` schema from the referenced message 3. Compiles the resolved JSON Schema for packet validation 4. Validates all packets against their compiled schemas at runtime If the AsyncAPI document is at an HTTP URL, the runtime fetches it once at startup and caches it: ```yaml asyncapi: https://api.mycompany.com/asyncapi.yaml ``` ## Handling schema evolution ### Non-breaking changes (safe) These AsyncAPI schema changes do not break existing FlowDSL flows: - Adding optional fields to a message payload - Adding new messages (that the flow doesn't reference) - Changing field descriptions or metadata ### Breaking changes (require coordination) These changes will cause packet validation failures: - Removing required fields - Renaming fields - Changing field types - Changing `required` arrays **Recommended approach:** 1. **Version your AsyncAPI messages.** Add `v2` variants rather than modifying existing ones: ```yaml components: messages: OrderPlacedV1: # Keep existing payload: ... OrderPlacedV2: # New version with breaking changes payload: ... ``` 2. **Version the reference in FlowDSL:** ```yaml # old-flow.flowdsl.yaml packet: "asyncapi#/components/messages/OrderPlacedV1" # new-flow.flowdsl.yaml packet: "asyncapi#/components/messages/OrderPlacedV2" ``` 3. Deploy the new FlowDSL flow before stopping the old one to avoid gaps. ## Validation Both documents validate independently: ```bash # Validate the AsyncAPI document asyncapi validate events.asyncapi.yaml # Validate the FlowDSL document (also resolves asyncapi# references) flowdsl validate order-processing.flowdsl.yaml ``` The FlowDSL validator fails if: - The `asyncapi` file cannot be found or loaded - An `asyncapi#/...` JSON Pointer doesn't resolve to a valid message - The resolved message has no `payload` field ## Summary - Link AsyncAPI with `asyncapi: "./path/or/url"` at the document level - Reference messages with `asyncapi#/components/messages/MessageName` - Mix native and AsyncAPI packets freely - Version AsyncAPI messages to handle breaking schema changes safely - Both documents validate independently; FlowDSL also validates reference paths ## Next steps - [Packets concept](https://flowdsl.com/docs/concepts/packets) — native packet definitions - [Connecting AsyncAPI tutorial](https://flowdsl.com/docs/tutorials/connecting-asyncapi) — step-by-step integration - [Redelay Integration](https://flowdsl.com/docs/guides/redelay-integration) — AsyncAPI from Python/FastAPI # How to Choose the Right Delivery Mode Every edge in a FlowDSL flow has a delivery mode. Choosing the wrong mode is either costly (unnecessary MongoDB writes for cheap transforms) or dangerous (using `direct` for a payment charge). This guide gives you a systematic way to make that decision. ## The decision tree ```mermaid flowchart TD Q1{"Is data loss\nunacceptable?"} Q2{"Expensive external call?\n(LLM, payment, SMS, API)"} Q3{"Fan-out to external systems\nor event sourcing?"} Q4{"Throughput >10k/sec\nand step is cheap?"} Q5{"Benefits from worker\npool smoothing?"} DQ[durable] DQ2["durable\n+ idempotencyKey"] EB[stream] D[direct] EQ[ephemeral] CP[checkpoint] Q1 -->|YES| DQ Q1 -->|NO| Q2 Q2 -->|YES| DQ2 Q2 -->|NO| Q3 Q3 -->|YES| EB Q3 -->|NO| Q4 Q4 -->|YES| D Q4 -->|NO| Q5 Q5 -->|YES| EQ Q5 -->|NO| CP ``` ### The questions **Q1: Is data loss unacceptable?** If losing this packet would cause a business problem — a payment not charged, a support ticket not created, a notification not sent — the answer is yes. → **durable** **Q2: Does this step involve an expensive external call?** LLM invocations, payment charges, email/SMS sends, calls to third-party APIs with rate limits or per-call costs — these are expensive. Re-running them unnecessarily is costly or dangerous. → **durable + idempotencyKey** The idempotency key is essential here because `durable` provides at-least-once delivery. Without it, a retry would call the LLM or send the SMS twice. **Q3: Do you need fan-out to external systems?** If multiple independent consumers need to react to this event — external services, analytics, audit logs, other teams' systems — publish to the event bus. Kafka's consumer group model handles independent consumption without flow changes. → **stream** **Q4: Is throughput >10k/sec and the step cheap?** High-volume, CPU-bound, deterministic steps (parsing, field extraction, format conversion) that run in microseconds and can be replayed from the source don't need a queue. In-process is fastest. → **direct** **Q5: Benefits from worker pool smoothing?** Medium-throughput steps with variable processing time benefit from a queue that decouples the producer and consumer rates. Redis streams provide this at low durability cost. → **ephemeral** **Otherwise:** Long multi-stage pipelines where each step is expensive and you want resume-from-last-stage semantics. → **checkpoint** ## By workload class ### Stateful business workflow Each event is a complete unit of work with multiple external interactions. Data loss is unacceptable throughout. ```yaml # Email triage workflow — all durable edges: - from: EmailFetcher to: LlmAnalyzer delivery: mode: durable idempotencyKey: "{{payload.messageId}}-analyze" - from: LlmAnalyzer to: RouteEmail delivery: mode: durable - from: RouteEmail.urgent to: SendSmsAlert delivery: mode: durable idempotencyKey: "{{payload.messageId}}-sms" ``` ### High-throughput data pipeline High-volume event processing where throughput matters, stages are progressively more expensive, and the first stages can be replayed cheaply from the source. ```yaml # Telemetry pipeline — modes escalate in durability as value increases edges: # Fast, cheap parse — in-process - from: IngestTelemetry to: ParseEvent delivery: mode: direct # Medium throughput enrichment — worker smoothing - from: ParseEvent to: EnrichWithMeta delivery: mode: ephemeral stream: telemetry-enrich # Expensive aggregation — stage-level resume - from: EnrichWithMeta to: AggregateMetrics delivery: mode: checkpoint batchSize: 100 # LLM anomaly detection — expensive, must not duplicate - from: AggregateMetrics to: DetectAnomalies delivery: mode: durable idempotencyKey: "{{payload.windowId}}-anomaly" # Publish to downstream consumers - from: DetectAnomalies to: PublishAnomalyEvent delivery: mode: stream topic: telemetry.anomalies ``` ## Mode comparison at a glance | Scenario | Mode | Reason | | ----------------------- | ----------------------- | -------------------------------- | | JSON field extraction | `direct` | Cheap, in-process, deterministic | | Log parsing at 100k/sec | `direct` | Throughput > durability | | Burst absorption | `ephemeral` | Worker pool, variable rate | | Long ETL pipeline | `checkpoint` | Stage-level resume | | Payment charge | `durable` | Critical, must not lose | | LLM call | `durable + idempotency` | Expensive, non-deterministic | | Fanout to analytics | `stream` | Multiple independent consumers | | Send SMS | `durable + idempotency` | Irreversible side effect | | Archive spam | `direct` | Idempotent, cheap, fast | ## When in doubt Use `durable`. It has strong guarantees and reasonable performance (typically <5ms overhead on MongoDB). You can always optimize to `direct` or `ephemeral` after you have real performance data. The cost of under-engineering delivery semantics is data loss; the cost of over-engineering is a few milliseconds of latency. > **Default rule:** Start with `durable`. Add `idempotencyKey` whenever the node has side effects. Optimize later. ## Next steps - [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) — the full explanation of each mode - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — implementing safe idempotent nodes - [Stateful vs Streaming](https://flowdsl.com/docs/guides/stateful-vs-streaming) — the two workload classes # Error Handling, Dead Letters, and Recovery FlowDSL has three layers of error handling: node-level errors, delivery-level retries, and dead letter queues. Understanding each layer and how they interact is essential for building flows that degrade gracefully and recover automatically. ## Node-level errors A node handler signals failure by returning a typed error. The error code determines whether the runtime retries: | Error code | Meaning | Runtime behavior | | -------------- | ------------------------------------------ | ---------------------------------- | | `VALIDATION` | Data problem — wrong schema, missing field | Dead letter immediately (no retry) | | `TIMEOUT` | Request timed out | Retry if policy configured | | `RATE_LIMITED` | External API rate limit | Retry with backoff | | `TEMPORARY` | Transient failure | Retry if policy configured | | `PERMANENT` | Permanent failure | Dead letter immediately (no retry) | ```go // Go: return typed errors if !payload.Has("orderId") { return flowdsl.NodeOutput{}, flowdsl.NewNodeError( flowdsl.ErrCodeValidation, "orderId is required", nil, ) } ``` ```python # Python: raise NodeError if not payload.get("orderId"): raise NodeError(ErrorCode.VALIDATION, "orderId is required") ``` ## Delivery-level retries Retries are configured on the edge, not the node. When a node returns a retriable error (`TIMEOUT`, `RATE_LIMITED`, `TEMPORARY`), the runtime waits according to the backoff policy and redelivers the packet: ```yaml edges: - from: PrepareOrder to: ChargePayment delivery: mode: durable packet: OrderPayload retryPolicy: maxAttempts: 4 backoff: exponential initialDelay: PT2S maxDelay: PT60S jitter: true retryOn: [TIMEOUT, TEMPORARY, RATE_LIMITED] ``` The runtime tracks retry count per packet. Each attempt is logged with a timestamp and error detail. ## Dead letter queues When all retry attempts are exhausted (or the error is non-retriable), the packet moves to a dead letter queue: - **Location:** MongoDB collection `{flowId}.dead_letters` - **Content:** Original packet, last error, all attempt timestamps, node ID, flow ID - **Retention:** Configurable (default: 30 days) ```json { "_id": "dlq-ord-001-charge", "flowId": "order_fulfillment", "nodeId": "ChargePayment", "operationId": "charge_payment", "packet": { "orderId": "ord-001", "amount": 99.99, "currency": "USD" }, "lastError": { "code": "TEMPORARY", "message": "Payment processor unavailable", "timestamp": "2026-03-28T10:05:00Z" }, "attempts": [ { "timestamp": "2026-03-28T10:00:00Z", "error": "Connection timeout" }, { "timestamp": "2026-03-28T10:00:02Z", "error": "Connection timeout" }, { "timestamp": "2026-03-28T10:00:06Z", "error": "Payment processor unavailable" }, { "timestamp": "2026-03-28T10:00:14Z", "error": "Payment processor unavailable" } ], "createdAt": "2026-03-28T10:00:00Z", "deadLetteredAt": "2026-03-28T10:05:00Z" } ``` ## Configuring dead letter queues ```yaml edges: - from: ValidateOrder to: ChargePayment delivery: mode: durable packet: ValidatedOrder retryPolicy: maxAttempts: 4 backoff: exponential initialDelay: PT2S deadLetterQueue: payment-failures # Named DLQ (optional, defaults to flowId.dead_letters) ``` Named dead letter queues allow different monitoring and recovery strategies per error type. ## Inspecting dead letters Via the runtime API: ```bash # List all dead letters for a flow curl http://localhost:8081/flows/order_fulfillment/dead-letters # Get dead letter details curl http://localhost:8081/flows/order_fulfillment/dead-letters/dlq-ord-001-charge ``` Via Studio: - Open the flow → click the **Dead Letters** tab - Inspect the full packet and error chain - Re-inject selected packets after fixing the underlying issue ## Manual recovery: re-injecting dead letters After fixing the underlying issue (the payment processor is back up, the schema was corrected), re-inject the packet to restart processing: ```bash # Re-inject a specific dead letter packet curl -X POST http://localhost:8081/flows/order_fulfillment/dead-letters/dlq-ord-001-charge/reinject ``` The re-injected packet goes back to the same node with the same idempotency key (if set). If the node handler previously completed before the crash that caused the dead letter, the idempotency key prevents a duplicate action. ## Configuring dead letter alerts Add alerting via `x-alerts` extension: ```yaml nodes: ChargePayment: operationId: charge_payment kind: action x-alerts: onDeadLetter: webhook: https://hooks.slack.com/services/... message: "Payment failed for order {{packet.orderId}}" channels: ["#payments-alerts"] ``` ## `direct` delivery and errors For `direct` edges, errors propagate immediately to the caller (no queue, no retry): ```text WebhookReceiver → [direct] → JsonTransformer ``` If `JsonTransformer` throws an error, the webhook response returns HTTP 500. The client is responsible for retrying. Use `direct` only for steps where immediate error propagation is acceptable. ## Error handling for `stream` edges For `stream` delivery (Kafka), error handling is managed by the Kafka consumer group: - Failed messages are retried by the consumer group's retry mechanism - After max retries, messages go to a Kafka dead letter topic: `{originalTopic}.dead-letter` - The FlowDSL runtime creates this topic automatically ## Summary | Layer | Mechanism | Configured on | | -------------- | --------------------------- | ---------------------------- | | Node error | Typed error codes | Node handler code | | Delivery retry | `retryPolicy` | Edge delivery policy | | Dead letter | Automatic after max retries | Edge `deadLetterQueue` field | | Alert | `x-alerts` extension | Node definition | | Recovery | Re-inject via API or Studio | Manual or automated | ## Next steps - [Retry Policies](https://flowdsl.com/docs/concepts/retry-policies) — backoff strategies in detail - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — preventing duplicate side effects on retry - [RetryPolicy reference](https://flowdsl.com/docs/reference/spec/retry-policy) — full field reference # High-Throughput Pipelines This guide covers performance optimization for FlowDSL flows processing tens of thousands of events per second. It addresses delivery mode throughput limits, checkpoint tuning, batching, and parallelism. ## Throughput targets by delivery mode | Mode | Approx. throughput | Limiting factor | | ------------ | ------------------- | ----------------------------------------- | | `direct` | 500k–1M+ events/sec | CPU, memory bandwidth | | `ephemeral` | 50k–100k events/sec | Redis / NATS / RabbitMQ throughput | | `checkpoint` | 5k–20k events/sec | Mongo / Redis / Postgres write throughput | | `durable` | 2k–10k events/sec | Mongo / Postgres write + index | | `stream` | 100k+ events/sec | Kafka / NATS throughput | These are rough targets. Actual throughput depends on payload size, node processing time, hardware, and configuration. ## Design for throughput ### Use `direct` for the hot path The cheapest, fastest stages should use `direct`. Reserve `durable` and `checkpoint` for stages where durability is genuinely needed: ```yaml edges: # Fast path: parse + validate + filter — all direct - from: Ingest to: Parse delivery: { mode: direct } - from: Parse to: Validate delivery: { mode: direct } - from: Validate to: Filter delivery: { mode: direct } # Durability only where needed - from: Filter to: Enrich delivery: { mode: ephemeral, stream: enrich-q } - from: Enrich to: Store delivery: mode: durable packet: EnrichedEvent ``` ### Batch with `checkpoint` The `batchSize` field on checkpoint edges accumulates N packets before writing to MongoDB. This dramatically reduces MongoDB write operations: ```yaml edges: - from: ParseLog to: AggregateMetrics delivery: mode: checkpoint packet: ParsedLog batchSize: 1000 # Write checkpoint every 1000 packets — not every 1 checkpointInterval: 5000 # Also checkpoint every 5000 packets regardless ``` At 10k events/sec with batchSize 1000, MongoDB writes drop from 10k/sec to 10/sec. ### Use `ephemeral` for burst absorption If upstream produces bursts and downstream is slower, `ephemeral` smooths the rate: ```yaml edges: - from: WebhookReceiver # Bursty ingest: 0–50k/sec spikes to: ProcessEvent # Steady consumer: 5k/sec delivery: mode: ephemeral stream: event-processing-queue maxLen: 500000 # Allow up to 500k buffered events ``` `maxLen` prevents Redis memory exhaustion during sustained overload. ## Parallelism ### Multiple node instances Run multiple instances of the same node to process in parallel. Each instance registers the same `operationId` — the runtime load-balances: ```yaml # node-registry.yaml nodes: process_event: instances: - address: localhost:8080 - address: localhost:8081 - address: localhost:8082 version: "1.0.0" ``` For `ephemeral` edges, the runtime uses Redis consumer groups — multiple instances consume from the same stream without duplicating work. ### Kafka partitioning for `stream` Kafka scales horizontally through partitioning. More partitions → more consumer instances → higher throughput: ```yaml edges: - from: ProcessOrder to: PublishOrderEvent delivery: mode: stream topic: orders.processed # Kafka will partition by key automatically # Add more partitions via Kafka admin when you need more parallelism ``` ## Redis tuning for `ephemeral` ```yaml # docker-compose.yaml or Redis config redis: command: > redis-server --save "" # Disable persistence for max throughput (data is ephemeral) --maxmemory 2gb --maxmemory-policy allkeys-lru ``` For maximum ephemeral throughput, disable Redis persistence (`--save ""`). Since `ephemeral` provides no durability guarantee, there's no point persisting the stream to disk. ## MongoDB tuning for `checkpoint` and `durable` ```text # MongoDB connection string for high throughput MONGODB_URI=mongodb://localhost:27017/flowdsl?maxPoolSize=50&minPoolSize=10&maxIdleTimeMS=120000 ``` Key MongoDB settings: - `maxPoolSize: 50` — allow up to 50 concurrent connections from the runtime - Create indexes on `{flowId}.packets`: `{executionId: 1, nodeId: 1}` - Use a write concern of `{w: 1}` for checkpoint edges (not `{w: majority}`) to reduce write latency ## Profiling FlowDSL flows The runtime exposes Prometheus metrics at `/metrics`: ```text # Key metrics to watch flowdsl_node_duration_seconds{node="ProcessEvent"} # Node processing time flowdsl_edge_delivery_duration_seconds{mode="checkpoint"} # Delivery overhead flowdsl_queue_depth{stream="enrich-q"} # Backlog size flowdsl_dead_letter_count{flow="pipeline_v2"} # Error rate ``` Add to your Grafana dashboard and alert on: - `queue_depth > 100000` (backpressure building) - `dead_letter_count > 0` (errors requiring attention) - `node_duration_seconds p99 > 5` (slow nodes) ## Summary | Technique | When to apply | | ------------------------- | -------------------------------------------------- | | `direct` for hot path | Always, for cheap deterministic transforms | | `batchSize` on checkpoint | When writing to MongoDB at >1k events/sec | | `maxLen` on ephemeral | When upstream can burst beyond downstream capacity | | Multiple node instances | When a single node is CPU-bound | | Redis persistence off | For `ephemeral` at maximum throughput | | MongoDB connection pool | When `durable` write latency is high | ## Next steps - [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) — mode characteristics and guarantees - [Stateful vs Streaming](https://flowdsl.com/docs/guides/stateful-vs-streaming) — choosing the right workload model - [Checkpoints](https://flowdsl.com/docs/concepts/checkpoints) — checkpoint mechanics # Writing Idempotent Nodes FlowDSL's `durable` delivery mode provides at-least-once delivery — a packet may be delivered more than once if the process crashes between execution and acknowledgment. Nodes with side effects (sending emails, charging payments, calling external APIs) must be idempotent to handle this safely. ## What idempotency means A function is idempotent if calling it multiple times with the same input produces the same observable result as calling it once. For a FlowDSL node: - An idempotent SMS sender sends the SMS once, even if the handler is called twice - An idempotent order creator creates the order once, even if the packet is redelivered - An idempotent LLM node calls the LLM once per document, even after a crash-retry ## The idempotencyKey field The primary tool for idempotency in FlowDSL is the `idempotencyKey` field on a delivery policy: ```yaml edges: - from: ClassifyEmail to: SendSmsAlert delivery: mode: durable packet: AlertPayload idempotencyKey: "{{payload.messageId}}-sms-alert" ``` The template uses `{{payload.field}}` syntax and is evaluated against each packet. The result must be **globally unique** for the intended logical operation. ### How the runtime uses it 1. Before delivering a packet, the runtime computes the idempotency key. 2. It checks MongoDB's `{flowId}.idempotency_keys` collection for an existing record with that key. 3. If found and marked `completed`: the packet is acknowledged without calling the node. 4. If found and marked `in_progress`: the packet is held until the in-progress execution completes. 5. If not found: the packet is delivered to the node. After the node returns successfully, the key is marked `completed`. This prevents duplicate side effects even when the same packet is delivered multiple times. ## Idempotency key design Good idempotency keys are: | Property | Explanation | | ------------------------- | ----------------------------------------------------- | | **Unique per operation** | `{entityId}-{operation}`, not just `{entityId}` | | **Stable across retries** | The same packet always produces the same key | | **Not reusable** | Never reuse a key for a logically different operation | ```yaml # GOOD: unique per entity and operation idempotencyKey: "{{payload.orderId}}-charge-payment" idempotencyKey: "{{payload.messageId}}-sms-alert" idempotencyKey: "{{payload.documentId}}-summarize-v2" # BAD: too generic — collides across different operations idempotencyKey: "{{payload.orderId}}" idempotencyKey: "{{payload.id}}" ``` ## Implementing idempotency in Go ```go func (n *SmsAlertNode) Handle(ctx context.Context, input flowdsl.NodeInput) (flowdsl.NodeOutput, error) { payload, err := input.Packet("in") if err != nil { return flowdsl.NodeOutput{}, err } messageId, _ := payload.GetString("messageId") // The runtime has already checked the idempotency key before calling Handle. // If we're here, it's safe to proceed — this is the first call for this key. // However, external APIs may have their own idempotency mechanisms. // Pass the idempotency key to the Twilio SDK idempotencyKey := input.Context().IdempotencyKey result, err := n.twilio.SendSMS(ctx, &twilio.SMSParams{ To: payload.GetStringOr("phoneNumber", ""), Body: payload.GetStringOr("message", ""), IdempotencyKey: idempotencyKey, // Twilio deduplicates on their end too }) if err != nil { if isTwilioRateLimit(err) { return flowdsl.NodeOutput{}, flowdsl.NewNodeError(flowdsl.ErrCodeRateLimited, "Twilio rate limit", err) } return flowdsl.NodeOutput{}, flowdsl.NewNodeError(flowdsl.ErrCodeTemporary, "Twilio SMS failed", err) } return flowdsl.NodeOutput{}.Send("out", map[string]any{ "sid": result.SID, "status": result.Status, }), nil } ``` ## Implementing idempotency in Python ```python class CreateTicketNode(BaseNode): operation_id = "create_support_ticket" async def handle(self, input: NodeInput) -> NodeOutput: payload = await input.packet("in") ticket_id_source = payload.get("email", {}).get("messageId") # Use the idempotency key from the edge policy (set by the runtime) idempotency_key = input.context.idempotency_key # Check our own store first (for external systems that don't support idempotency) existing = await self._db.get_ticket_by_idempotency_key(idempotency_key) if existing: # Already created — return the existing ticket without calling the API return NodeOutput().send("out", existing) # Create the ticket ticket = await self._zendesk.create_ticket( subject=payload.get("email", {}).get("subject", ""), body=payload.get("email", {}).get("body", ""), priority=payload.get("classification"), ) # Store our own record for idempotency await self._db.store_idempotency_record(idempotency_key, ticket) return NodeOutput().send("out", ticket) ``` ## External API idempotency Many external APIs have their own idempotency mechanisms. Use them in addition to FlowDSL's built-in key tracking: | API | Idempotency mechanism | | -------- | ------------------------------------------------ | | Stripe | `Idempotency-Key` header | | Twilio | `X-Twilio-Idempotency` | | SendGrid | No native support — track in your database | | Zendesk | No native support — check for existing tickets | | OpenAI | No native support — use FlowDSL's built-in dedup | ## Database idempotency patterns For databases, use upsert operations instead of insert: ```go // WRONG: Insert fails on duplicate — causes error, triggers retry _, err = db.Collection("orders").InsertOne(ctx, order) // CORRECT: Upsert is idempotent — safe to run multiple times _, err = db.Collection("orders").UpdateOne(ctx, bson.M{"orderId": order.OrderID}, bson.M{"$setOnInsert": order}, options.Update().SetUpsert(true), ) ``` In Python with MongoDB: ```python await db.orders.update_one( {"orderId": order["orderId"]}, {"$setOnInsert": order}, upsert=True, ) ``` ## Testing idempotency ```go func TestSmsAlertIdempotency(t *testing.T) { node := &SmsAlertNode{} twilio := &MockTwilioClient{} payload := flowdsl.NewPacket(map[string]any{ "messageId": "msg-001", "phoneNumber": "+15550100300", "message": "Production alert: database unreachable", }) input := flowdsl.MockNodeInput("in", payload, flowdsl.WithIdempotencyKey("msg-001-sms"), ) // First call — should send SMS _, err := node.Handle(context.Background(), input) require.NoError(t, err) assert.Equal(t, 1, twilio.SentCount()) // Second call with same idempotency key — should NOT send SMS _, err = node.Handle(context.Background(), input) require.NoError(t, err) assert.Equal(t, 1, twilio.SentCount()) // still 1, not 2 } ``` ## Summary | Pattern | Where to apply | | ------------------------ | ------------------------------------------------- | | `idempotencyKey` on edge | All `durable` edges with side effects | | Pass key to external API | When the API supports its own idempotency header | | Check-before-create | For APIs without native idempotency | | Upsert instead of insert | For all database writes in node handlers | | Unique key per operation | `{entityId}-{operation}`, never just `{entityId}` | ## Next steps - [Retry Policies](https://flowdsl.com/docs/concepts/retry-policies) — configuring retry behavior - [Error Handling](https://flowdsl.com/docs/guides/error-handling) — dead letters and recovery - [LLM Flows](https://flowdsl.com/docs/guides/llm-flows) — idempotency for expensive LLM calls # Guides Guides cover specific decisions and patterns that don't fit neatly into a tutorial or reference page. They answer "how do I approach X?" rather than "what does X mean?" or "how do I build X step by step?" ## Pages in this section ### [Choosing Delivery Modes](https://flowdsl.com/docs/guides/choosing-delivery-modes) A decision tree for selecting the right delivery mode for each edge in your flow. Covers the key questions: is data loss acceptable? is this an expensive external call? do you need fan-out? ### [Stateful Workflows vs Streaming Pipelines](https://flowdsl.com/docs/guides/stateful-vs-streaming) Two fundamentally different workload classes in FlowDSL. Understand which you're building and how it affects your node design, delivery mode choices, and operational patterns. ### [Idempotency](https://flowdsl.com/docs/guides/idempotency) How to make node handlers safe to retry and replay. Covers `idempotencyKey` configuration, deduplication patterns in Go and Python, and external API idempotency. ### [Error Handling](https://flowdsl.com/docs/guides/error-handling) Dead letters, retry behavior, circuit breakers, and recovery patterns. How to build flows that degrade gracefully and recover automatically. ### [High-Throughput Pipelines](https://flowdsl.com/docs/guides/high-throughput-pipelines) Batching, checkpoint interval tuning, parallelism, and performance targets for each delivery mode. For teams moving beyond prototype scale. ### [LLM Flows](https://flowdsl.com/docs/guides/llm-flows) Building AI agent pipelines with FlowDSL — the right delivery modes, idempotency patterns, cost management, and complete example flows for document intelligence and support automation. ### [AsyncAPI Integration](https://flowdsl.com/docs/guides/asyncapi-integration) Full guide to the AsyncAPI ↔ FlowDSL integration. Schema referencing, runtime resolution, validation, schema evolution, and breaking change handling. ### [Redelay Integration](https://flowdsl.com/docs/guides/redelay-integration) How to use [redelay](https://redelay.com){rel=""nofollow""} (Python/FastAPI event framework) as a FlowDSL backend, including automatic AsyncAPI generation from Pydantic events. ### [Node Development](https://flowdsl.com/docs/guides/node-development) How to develop, test, version, and publish FlowDSL nodes. Covers the manifest format, local development workflow, and the node registry. # Building AI Agent Flows with FlowDSL FlowDSL is exceptionally well-suited for orchestrating LLM workflows. The core challenge with LLM pipelines is that LLM calls are expensive, non-deterministic, and have side effects — exactly the conditions where FlowDSL's delivery semantics provide the most value. ## Why LLM steps need `durable` An LLM call has three properties that make transport semantics critical: 1. **Expensive** — each call costs money. Re-running unnecessarily wastes budget. 2. **Non-deterministic** — calling the same prompt twice may return different results. Retry-on-failure could produce inconsistent pipeline state. 3. **Slow** — LLM calls take 0.5–10 seconds. A process crash during that window is common. `durable` with `idempotencyKey` solves all three: - The packet is persisted before delivery — if the process crashes during the LLM call, the packet survives and is redelivered to the same node. - The idempotency key prevents the LLM from being called again if the packet is redelivered after the call already completed. - The runtime acknowledges the packet only after the node handler returns successfully. ```yaml edges: - from: PrepareContext to: LlmSummarize delivery: mode: durable packet: DocumentContext idempotencyKey: "{{payload.documentId}}-summarize-v1" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S maxDelay: PT60S retryOn: [RATE_LIMITED, TIMEOUT] ``` ## Why idempotency is critical for LLM nodes Consider a document summarizer that: 1. Receives a document chunk 2. Calls GPT-4 to summarize it (takes 3 seconds) 3. Stores the result in the database 4. Acknowledges the packet If the process crashes at step 3, the runtime will redeliver the packet. Without idempotency: - The LLM is called again (costs money, may return different text) - Two different summaries may be stored for the same chunk - Downstream nodes receive inconsistent state With `idempotencyKey: "{{payload.chunkId}}-summarize"`: - The runtime checks if this key was already processed - If yes, the stored result is returned without calling the LLM again - Consistent, cost-efficient, deterministic ## Common LLM node types ### LlmAnalyzer Classifies or analyzes input. Returns structured JSON output. ```yaml LlmAnalyzer: operationId: llm_analyze_content kind: llm inputs: in: { packet: ContentPayload } outputs: out: { packet: AnalysisResult } settings: model: gpt-4o-mini temperature: 0.1 responseFormat: json_object systemPrompt: | Analyze the content and return structured JSON with: classification, confidence (0-1), key_entities, sentiment ``` ### LlmRouter Classifies input and returns a routing decision. Use a `router` kind node that reads the LLM output. ```yaml LlmClassifier: operationId: llm_classify_for_routing kind: llm settings: systemPrompt: "Return JSON: {\"route\": \"path_a|path_b|path_c\"}" RouteOnClassification: operationId: route_on_llm_classification kind: router inputs: in: { packet: ClassificationResult } outputs: path_a: { packet: ClassificationResult } path_b: { packet: ClassificationResult } path_c: { packet: ClassificationResult } ``` ### LlmSummarizer Reduces a long document to a shorter summary. ```yaml LlmSummarize: operationId: llm_summarize_document kind: llm inputs: in: { packet: DocumentChunks } outputs: out: { packet: DocumentSummary } settings: model: gpt-4o maxTokens: 500 systemPrompt: "Summarize this document in 3-5 sentences. Focus on key findings." ``` ### LlmExtractor Extracts structured data from unstructured text. ```yaml LlmExtract: operationId: llm_extract_entities kind: llm inputs: in: { packet: RawText } outputs: out: { packet: ExtractedEntities } settings: systemPrompt: | Extract: names, organizations, dates, amounts. Return JSON: {"names": [...], "organizations": [...], "dates": [...], "amounts": [...]} ``` ## Example flow 1: Document Intelligence Pipeline Processes uploaded documents through extraction, chunking, embedding, summarization, and indexing. ```mermaid flowchart LR A[UploadReceived] --> B[ExtractText] B --> C[ChunkDocument] C --> D[EmbedChunks] D --> E[LlmSummarize] E --> F[LlmExtractFacts] F --> G[IndexDocument] ``` ```yaml flowdsl: "1.0" info: title: Document Intelligence Pipeline version: "1.0.0" nodes: UploadReceived: operationId: receive_document_upload kind: source outputs: out: { packet: DocumentUpload } ExtractText: operationId: extract_pdf_text kind: transform inputs: in: { packet: DocumentUpload } outputs: out: { packet: ExtractedText } ChunkDocument: operationId: chunk_document_text kind: transform inputs: in: { packet: ExtractedText } outputs: out: { packet: DocumentChunks } settings: chunkSize: 1000 overlap: 100 EmbedChunks: operationId: embed_document_chunks kind: action inputs: in: { packet: DocumentChunks } outputs: out: { packet: EmbeddedChunks } settings: embeddingModel: text-embedding-3-small batchSize: 20 LlmSummarize: operationId: llm_summarize_document kind: llm inputs: in: { packet: EmbeddedChunks } outputs: out: { packet: DocumentWithSummary } settings: model: gpt-4o systemPrompt: "Summarize the document in 3-5 sentences focusing on key insights." LlmExtractFacts: operationId: llm_extract_document_facts kind: llm inputs: in: { packet: DocumentWithSummary } outputs: out: { packet: DocumentWithFacts } settings: model: gpt-4o-mini systemPrompt: "Extract key facts, dates, names, and figures as structured JSON." IndexDocument: operationId: index_document_in_search kind: action inputs: in: { packet: DocumentWithFacts } edges: - from: UploadReceived to: ExtractText delivery: mode: direct packet: DocumentUpload - from: ExtractText to: ChunkDocument delivery: mode: checkpoint packet: ExtractedText - from: ChunkDocument to: EmbedChunks delivery: mode: checkpoint packet: DocumentChunks batchSize: 10 - from: EmbedChunks to: LlmSummarize delivery: mode: durable packet: EmbeddedChunks idempotencyKey: "{{payload.documentId}}-summarize" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S retryOn: [RATE_LIMITED, TIMEOUT] - from: LlmSummarize to: LlmExtractFacts delivery: mode: durable packet: DocumentWithSummary idempotencyKey: "{{payload.documentId}}-extract-facts" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT3S - from: LlmExtractFacts to: IndexDocument delivery: mode: durable packet: DocumentWithFacts idempotencyKey: "{{payload.documentId}}-index" ``` **Key patterns:** - `checkpoint` before LLM stages — if the embedding fails, resume without re-extracting - `durable` at each LLM step — packet-level guarantee - Unique `idempotencyKey` per LLM operation — never call the same LLM twice for the same document ## Example flow 2: Support Ticket Auto-Resolver Automatically resolves support tickets using LLM classification, knowledge base search, and response drafting. ```mermaid flowchart LR A[TicketReceived] --> B[ClassifyTicket] B --> C[SearchKnowledgeBase] C --> D[DraftResponse] D --> E{ConfidenceRouter} E -->|high confidence| F[AutoReply] E -->|low confidence| G[RouteToHuman] ``` ```yaml flowdsl: "1.0" info: title: Support Ticket Auto-Resolver version: "1.0.0" nodes: TicketReceived: operationId: receive_support_ticket kind: source outputs: out: { packet: TicketPayload } ClassifyTicket: operationId: llm_classify_ticket kind: llm inputs: in: { packet: TicketPayload } outputs: out: { packet: ClassifiedTicket } settings: model: gpt-4o-mini systemPrompt: | Classify this support ticket by category and complexity. Return JSON: {"category": "...", "complexity": "simple|complex", "urgency": "low|medium|high"} SearchKnowledgeBase: operationId: search_knowledge_base kind: action inputs: in: { packet: ClassifiedTicket } outputs: out: { packet: TicketWithContext } settings: maxResults: 5 similarityThreshold: 0.75 DraftResponse: operationId: llm_draft_response kind: llm inputs: in: { packet: TicketWithContext } outputs: out: { packet: TicketWithDraft } settings: model: gpt-4o systemPrompt: | Draft a helpful, professional support response. Use the provided knowledge base context. Return JSON: {"response": "...", "confidence": 0.0-1.0, "sources": [...]} ConfidenceRouter: operationId: route_by_confidence kind: router inputs: in: { packet: TicketWithDraft } outputs: auto_reply: { packet: TicketWithDraft } human_review: { packet: TicketWithDraft } AutoReply: operationId: send_auto_reply kind: action inputs: in: { packet: TicketWithDraft } settings: confidenceThreshold: 0.85 RouteToHuman: operationId: route_to_human_agent kind: action inputs: in: { packet: TicketWithDraft } edges: - from: TicketReceived to: ClassifyTicket delivery: mode: durable packet: TicketPayload idempotencyKey: "{{payload.ticketId}}-classify" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT3S - from: ClassifyTicket to: SearchKnowledgeBase delivery: mode: durable packet: ClassifiedTicket - from: SearchKnowledgeBase to: DraftResponse delivery: mode: durable packet: TicketWithContext idempotencyKey: "{{payload.ticketId}}-draft" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S - from: DraftResponse to: ConfidenceRouter delivery: mode: durable packet: TicketWithDraft - from: ConfidenceRouter.auto_reply to: AutoReply delivery: mode: durable packet: TicketWithDraft idempotencyKey: "{{payload.ticketId}}-auto-reply" - from: ConfidenceRouter.human_review to: RouteToHuman delivery: mode: durable packet: TicketWithDraft idempotencyKey: "{{payload.ticketId}}-human-route" ``` ## Prompt management via settings Static prompts live in the `settings` field of the node definition. This keeps them version-controlled alongside the flow and visible in Studio: ```yaml nodes: LlmClassifier: operationId: llm_classify kind: llm settings: model: gpt-4o-mini temperature: 0.1 systemPrompt: | You are an expert classifier. Classify the input as... maxTokens: 500 responseFormat: json_object ``` For dynamic prompts (per-execution), pass them as part of the input packet schema. ## Cost awareness Use `checkpoint` before expensive LLM stages. If the pipeline fails after a cheap ETL step but before the LLM, you want to resume from the checkpoint — not re-run the ETL: ```text ExtractText (cheap) → checkpoint → LlmSummarize (expensive) ``` If `LlmSummarize` fails, the retry starts from the checkpoint with the already-extracted text, not from `ExtractText`. **Estimated per-stage costs:** - Text extraction: \~0.001 USD/doc - Embedding (1000 chunks): \~0.01 USD/doc - GPT-4o summarization: \~0.05 USD/doc - GPT-4o fact extraction: \~0.03 USD/doc With checkpointing, a retry at the summarization step costs \~0.05 USD, not \~0.06 USD. Over thousands of documents, checkpoints save significant budget. ## Handling LLM failures LLM APIs fail in predictable ways. Configure `retryOn` to handle each: ```yaml retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S maxDelay: PT120S jitter: true retryOn: - RATE_LIMITED # 429 Too Many Requests — back off and retry - TIMEOUT # Request timed out — retry # Note: VALIDATION errors (bad response format) go to dead letter — don't retry ``` If the LLM consistently returns malformed JSON (a `VALIDATION` error), the packet moves to the dead letter queue rather than retrying indefinitely. ## Summary | Pattern | When to apply | | ----------------------------------- | ---------------------------------- | | `durable` on LLM edges | Always | | `idempotencyKey` on LLM edges | Always — LLM calls cost money | | `checkpoint` before LLM stages | When previous stages are expensive | | `retryOn: [RATE_LIMITED, TIMEOUT]` | All LLM retry policies | | `exponential` backoff with `jitter` | All LLM retry policies | | `maxDelay: PT120S` | Give rate limits time to reset | ## Next steps - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — implementing the idempotent node pattern - [Choosing Delivery Modes](https://flowdsl.com/docs/guides/choosing-delivery-modes) — decision tree for all modes - [Writing a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — implement an LLM node # Building and Publishing FlowDSL Nodes A FlowDSL node is a small service with two parts: a handler (the business logic) and a manifest (the identity document). This guide covers the full development lifecycle from local implementation to registry publication. ## Node anatomy ```text my-node/ ├── main.go (or main.py) # Node server entry point ├── node.go (or node.py) # NodeHandler implementation ├── flowdsl-node.json # Manifest └── go.mod / requirements.txt ``` ## The flowdsl-node.json manifest The manifest is the node's identity document — it describes the node to the runtime and registry: ```json { "operationId": "send_sms_alert", "name": "SMS Alert", "version": "2.1.0", "description": "Sends an SMS alert via Twilio to a configured phone number", "runtime": "go", "inputs": [ { "name": "in", "packet": "AlertPayload", "description": "The alert to send" } ], "outputs": [ { "name": "out", "packet": "SmsResult", "description": "SMS delivery result" } ], "settings": { "type": "object", "properties": { "fromNumber": { "type": "string", "description": "Twilio sender number" }, "toNumber": { "type": "string", "description": "Recipient number" } }, "required": ["fromNumber", "toNumber"] }, "repository": "https://github.com/myorg/flowdsl-nodes", "author": "My Team", "license": "MIT", "tags": ["sms", "notifications", "twilio"] } ``` ## Local development workflow ### 1. Write the handler See [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) or [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) for complete implementation tutorials. ### 2. Register locally ```yaml # node-registry.yaml nodes: send_sms_alert: address: localhost:8083 version: "2.1.0" runtime: go ``` ### 3. Test in isolation ```go func TestSmsAlertNode(t *testing.T) { node := &SmsAlertNode{} err := node.Init(flowdsl.Settings{"fromNumber": "+15550100200", "toNumber": "+15550100300"}) require.NoError(t, err) input := flowdsl.MockNodeInput("in", map[string]any{ "message": "Production alert", "severity": "high", }) output, err := node.Handle(context.Background(), input) require.NoError(t, err) assert.Equal(t, "delivered", output.Packet("out").GetStringOr("status", "")) } ``` ### 4. Test with a live flow ```bash # Start your node ./sms-alert-node # Start the runtime with your test flow FLOWDSL_REGISTRY_FILE=./node-registry.yaml \ flowdsl-runtime start test-flow.flowdsl.yaml # Trigger the flow curl -X POST http://localhost:8081/flows/test_flow/trigger \ -d '{"message": "test", "severity": "high"}' ``` ## Node versioning Node versions follow semver (`major.minor.patch`): - **Patch** (1.0.x) — bug fixes, no contract changes - **Minor** (1.x.0) — new optional inputs/outputs, backwards compatible - **Major** (x.0.0) — breaking changes: renamed ports, removed outputs, changed packet types When you bump the major version, update all FlowDSL flows that reference this `operationId` before deploying. ## Node documentation best practices Write a clear `description` in the manifest and a `README.md`: ```markdown # send_sms_alert Sends an SMS alert via Twilio. ## Inputs | Port | Packet | Description | |------|--------|-------------| | `in` | `AlertPayload` | The alert to send | ## Outputs | Port | Packet | Description | |------|--------|-------------| | `out` | `SmsResult` | Delivery result with Twilio SID | ## Settings | Field | Type | Required | Description | |-------|------|----------|-------------| | `fromNumber` | string | Yes | Twilio sender number in E.164 format | | `toNumber` | string | Yes | Recipient number in E.164 format | ## Example \`\`\`yaml nodes: AlertEngineer: operationId: send_sms_alert kind: action settings: fromNumber: "+15550100200" toNumber: "+15550100300" \`\`\` ``` ## Publishing to repo.flowdsl.com (coming soon) The public node registry at `repo.flowdsl.com` is coming in a future release. When available: ```bash # Authenticate flowdsl auth login # Publish flowdsl publish --manifest flowdsl-node.json --tag latest # Published nodes are resolvable by operationId registry: https://repo.flowdsl.com ``` ## Summary | Step | Tool | | ----------------- | ------------------------------------- | | Implement handler | `flowdsl-go` or `flowdsl-py` SDK | | Write manifest | `flowdsl-node.json` | | Register locally | `node-registry.yaml` | | Test | Go test / pytest + MockNodeInput | | Version | Semver: breaking changes → major bump | | Publish | `flowdsl publish` (coming soon) | ## Next steps - [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) — implementation tutorial - [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — implementation tutorial - [Node Manifest reference](https://flowdsl.com/docs/reference/node-manifest) — full manifest field reference # Using Redelay as the FlowDSL Backend [Redelay](https://redelay.com){rel=""nofollow""} is a Python/FastAPI event framework — the first FlowDSL integration partner. It automatically generates AsyncAPI documents from Pydantic event models and provides the natural backend for Python-based FlowDSL flows. ## What redelay is Redelay is a Python library that lets you define events as Pydantic models and subscribe to them as FastAPI route handlers. It generates an AsyncAPI document from your Pydantic models automatically — no manual schema writing. ```python # redelay: define events as Pydantic models from redelay import event, subscribe from pydantic import BaseModel class OrderPlaced(BaseModel): order_id: str customer_id: str total: float currency: str @subscribe(OrderPlaced) async def handle_order_placed(event: OrderPlaced): # process the order pass ``` Running this app with `redelay serve` generates an AsyncAPI document at `/asyncapi.yaml`: ```yaml asyncapi: "2.6.0" info: title: Order Service Events version: "1.0.0" channels: orders/order-placed: subscribe: message: $ref: "#/components/messages/OrderPlaced" components: messages: OrderPlaced: payload: type: object properties: order_id: { type: string } customer_id: { type: string } total: { type: number } currency: { type: string } required: [order_id, customer_id, total, currency] ``` ## Connecting redelay to FlowDSL ### Step 1: Start your redelay service ```bash pip install redelay flowdsl-py redelay serve --host 0.0.0.0 --port 8090 ``` The AsyncAPI document is now available at `http://localhost:8090/asyncapi.yaml`. ### Step 2: Reference it in FlowDSL ```yaml flowdsl: "1.0" info: title: Order Processing Flow version: "1.0.0" asyncapi: http://localhost:8090/asyncapi.yaml nodes: OrderReceived: operationId: receive_order_event kind: source outputs: out: packet: "asyncapi#/components/messages/OrderPlaced" ProcessOrder: operationId: process_order kind: action inputs: in: packet: "asyncapi#/components/messages/OrderPlaced" edges: - from: OrderReceived to: ProcessOrder delivery: mode: durable packet: "asyncapi#/components/messages/OrderPlaced" ``` ### Step 3: Implement FlowDSL nodes as redelay handlers The `flowdsl-py` package provides a redelay integration module that lets you run FlowDSL node handlers as redelay subscribers: ```python from redelay import subscribe from flowdsl.redelay import as_flowdsl_node from pydantic import BaseModel class OrderPlaced(BaseModel): order_id: str customer_id: str total: float @subscribe(OrderPlaced) @as_flowdsl_node(operation_id="process_order") async def process_order(event: OrderPlaced): # This handler is automatically registered as a FlowDSL node # The FlowDSL runtime calls it when packets arrive on the edge result = await charge_payment(event.order_id, event.total) return {"orderId": event.order_id, "chargeId": result.charge_id} ``` ### Step 4: Register with the FlowDSL runtime ```yaml # node-registry.yaml nodes: process_order: address: localhost:8090 runtime: python version: "1.0.0" ``` ## How it fits together ```mermaid flowchart LR A["Pydantic models\n(redelay)"] -->|"auto-generates"| B["AsyncAPI doc\n/asyncapi.yaml"] B -->|"referenced by"| C["FlowDSL document\nasyncapi: ..."] C -->|"loaded by"| D["FlowDSL Runtime"] D -->|"calls"| E["redelay handlers\n(FlowDSL nodes)"] ``` ## Benefits of the redelay integration | Benefit | How | | --------------------- | ------------------------------------------------------------- | | No schema duplication | Pydantic models → AsyncAPI → FlowDSL (one source of truth) | | Type safety | Pydantic validation for all event schemas | | FastAPI ecosystem | Use all FastAPI middleware, auth, and tooling | | Auto-documented | redelay serves AsyncAPI UI at `/asyncapi-ui` | | Incremental adoption | Add FlowDSL orchestration without rewriting existing handlers | ## Summary - redelay generates AsyncAPI from Pydantic models automatically - Reference the generated AsyncAPI in your FlowDSL `asyncapi` field - Use `flowdsl-py`'s redelay integration to run node handlers as redelay subscribers - FlowDSL handles orchestration; redelay handles Python event handling ## Next steps - [AsyncAPI Integration](https://flowdsl.com/docs/guides/asyncapi-integration) — full AsyncAPI integration guide - [Writing a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — implement nodes with `flowdsl-py` - [Python SDK Reference](https://flowdsl.com/docs/tools/python-sdk) — full SDK API reference # Stateful Workflows vs Streaming Pipelines FlowDSL handles two fundamentally different workload classes. Recognizing which you're building determines your delivery mode choices, node design patterns, and operational approach. ## Stateful workflows A stateful workflow processes each event as a complete, independent unit of work. Each event has business significance — it represents a real-world transaction, a support request, a user action. Losing an event is a business problem. **Characteristics:** - Each event matters individually - Multiple external interactions per event (LLM calls, database writes, API calls) - Long processing time (seconds to minutes) - Data loss is unacceptable - Idempotency is required **Examples:** - Order fulfillment (receive order → validate → charge → ship) - Email triage (fetch → classify → route → notify) - Support ticket resolution (create → classify → draft → reply) - Lead processing (receive → enrich → score → route → assign) **Dominant delivery modes:** `durable` + `idempotencyKey` ```yaml # Stateful workflow: all durable edges: - from: OrderReceived to: ValidateOrder delivery: mode: direct # Fast validation, data recoverable from source - from: ValidateOrder to: ChargePayment delivery: mode: durable # Critical: payment must not be lost or duplicated idempotencyKey: "{{payload.orderId}}-charge" - from: ChargePayment to: SendConfirmation delivery: mode: durable # Critical: confirmation email must not duplicate idempotencyKey: "{{payload.orderId}}-confirm" ``` ## Streaming pipelines A streaming pipeline processes a continuous, high-volume stream of events where individual events are less significant but aggregate throughput matters. Losing a handful of events is acceptable (or events are easily replayed from the source). **Characteristics:** - High volume (thousands to millions of events per second) - Each step is cheap and deterministic - Processing time is microseconds to milliseconds - Events can be replayed from the source (Kafka, S3, database) - Throughput > durability for early stages **Examples:** - Log processing (ingest → parse → filter → aggregate → store) - Telemetry pipeline (collect → normalize → deduplicate → enrich → index) - Click stream processing (receive → extract → sessionize → compute → store) - IoT sensor data (ingest → validate → transform → store → alert) **Dominant delivery modes:** `direct` + `ephemeral` + `checkpoint` ```yaml # Streaming pipeline: modes escalate as value increases edges: - from: IngestLog to: ParseLog delivery: mode: direct # Fast, in-process, 1M+/sec - from: ParseLog to: EnrichWithGeoIP delivery: mode: ephemeral # Worker pool, burst absorption, ~50k/sec stream: log-enrich - from: EnrichWithGeoIP to: AggregateByService delivery: mode: checkpoint # Stage-level resume, ~10k/sec batchSize: 500 - from: AggregateByService to: DetectAnomalies delivery: mode: durable # LLM call: expensive, must not duplicate idempotencyKey: "{{payload.windowId}}-anomaly" ``` ## How to identify which you have Ask these questions about each incoming event: | Question | Stateful | Streaming | | ------------------------------------------------ | ------------------------- | -------------------------- | | Does losing this event cause a business problem? | Yes | No (or data is replayable) | | Does this event trigger external side effects? | Yes (SMS, email, payment) | Rarely | | Is each event individually significant? | Yes | No — aggregate matters | | Can I tolerate >1ms latency? | Yes (seconds are fine) | No — microseconds needed | | Volume per second? | Low–medium (<1000) | High (>1000) | ## Mixed flows Real-world flows often combine both patterns. A telemetry pipeline that detects anomalies and sends alerts is primarily streaming (cheap telemetry processing) with a stateful tail (durable alert delivery): ```mermaid flowchart LR A[IngestMetric] -->|direct| B[ParseMetric] B -->|ephemeral| C[Aggregate] C -->|checkpoint| D[DetectAnomaly] D -->|durable| E[SendPagerDutyAlert] ``` The `direct → ephemeral → checkpoint` chain is streaming. The `durable` at the alert step switches to stateful semantics for the business-critical notification. ## Summary | Attribute | Stateful | Streaming | | ------------------ | ------------ | ----------------------------------- | | Event significance | Each matters | Aggregate matters | | Primary modes | `durable` | `direct`, `ephemeral`, `checkpoint` | | Idempotency | Required | Usually not needed | | Throughput | Low–medium | High | | Acceptable latency | High | Low | ## Next steps - [Choosing Delivery Modes](https://flowdsl.com/docs/guides/choosing-delivery-modes) — decision tree for each edge - [High-Throughput Pipelines](https://flowdsl.com/docs/guides/high-throughput-pipelines) — optimizing streaming flows - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — required for stateful workflows # Welcome to FlowDSL ## What is FlowDSL? FlowDSL is a **domain-specific language** (DSL) for describing executable event-driven flow graphs. A DSL is a specialized language designed for one specific problem domain — SQL is the DSL for querying databases, HTML is the DSL for web page structure, CSS is the DSL for styling. FlowDSL is the DSL for **event-driven orchestration**: it gives you a precise, readable notation for flows, nodes, edges, delivery policies, retry semantics, and runtime guarantees. Unlike general-purpose code where transport logic gets hardcoded into every service, FlowDSL separates *what your system does* (nodes) from *how data moves between steps* (edges). The runtime reads the FlowDSL document and enforces the guarantees you declared. | You already know | It's the DSL for | | ---------------- | ----------------------------------- | | SQL | Database queries | | HTML | Web page structure | | CSS | Visual styling | | AsyncAPI | Event and message contracts | | **FlowDSL** | **Event-driven flow orchestration** | FlowDSL sits alongside OpenAPI and AsyncAPI as a sibling specification: - **OpenAPI** describes your HTTP interfaces - **AsyncAPI** describes your event and message contracts - **FlowDSL** describes your executable flow graphs and runtime semantics FlowDSL references AsyncAPI message definitions instead of duplicating them — keeping a clean separation between message contracts and orchestration logic. ::tip{icon="i-heroicons-bolt"} **Core principle:** Nodes define business logic. Edges define delivery semantics. The runtime enforces guarantees. :: ## What you'll find here ::tip New to FlowDSL? Start with [Getting Started](https://flowdsl.com/docs/tutorials/getting-started) to run your first flow in 5 minutes. :: | Section | What's inside | | ----------------------------------------------- | ----------------------------------------------------------------------- | | [Concepts](https://flowdsl.com/docs/concepts) | The vocabulary of FlowDSL — nodes, edges, delivery modes, packets | | [Tutorials](https://flowdsl.com/docs/tutorials) | Step-by-step walkthroughs for building real flows | | [Guides](https://flowdsl.com/docs/guides) | Practical decisions: delivery mode selection, LLM flows, error handling | | [Reference](https://flowdsl.com/docs/reference) | Field-by-field spec reference for every object | | [Tools](https://flowdsl.com/docs/tools) | Studio, CLI, Go SDK, Python SDK, JS SDK | | [Community](https://flowdsl.com/docs/community) | Contributing, code of conduct, roadmap | ## Why FlowDSL? Most teams describe their event-driven systems in one of three ways — and all three have significant drawbacks. ### vs hardcoding Kafka consumers When you hardcode Kafka consumers directly in your services, the topology of your flow is buried in application code. To understand the full data path, you have to read multiple codebases and grep for topic names. Delivery semantics — retries, dead letters, fan-out — are either absent or inconsistently implemented across teams. FlowDSL separates topology from transport. The `.flowdsl.yaml` document declares what connects to what and how, independently of any implementation language. The runtime enforces the delivery semantics so your application code stays clean. ### vs Temporal / Airflow Temporal and Airflow are powerful orchestration systems, but they are workflow engines first, not specifications. You describe workflows in code (Go, Python, Java), tied to a specific runtime. Portability across runtimes is an afterthought. FlowDSL is spec-first. The same `.flowdsl.yaml` file can be loaded by the Go runtime, the Python runtime, or any future runtime that implements the spec. The runtime is a pluggable implementation detail, not a vendor lock-in. ### vs n8n / Zapier n8n and Zapier are excellent for non-technical users wiring together SaaS integrations through a browser UI. But they are closed platforms, not open specifications. Your flows live inside the platform, not in your version control system. FlowDSL is code-first and developer-native. Flows are YAML or JSON files, committed to git, reviewed in pull requests, and validated in CI pipelines. The visual editor (Studio) is a convenience, not a requirement. ## Where FlowDSL sits in the API ecosystem FlowDSL occupies a distinct layer alongside OpenAPI and AsyncAPI: ```mermaid graph LR A[HTTP Request/Response] -->|described by| B[OpenAPI] C[Events & Messages] -->|described by| D[AsyncAPI] E[Flow Graphs] -->|described by| F[FlowDSL] D -->|optionally referenced by| F B -->|optionally referenced by| F ``` - **OpenAPI** describes the shape of HTTP APIs - **AsyncAPI** describes event and message contracts - **FlowDSL** describes executable flow graphs with delivery semantics FlowDSL is fully self-contained. You do not need an AsyncAPI or OpenAPI document to write a FlowDSL flow — packets and events are defined natively in the `components` section. AsyncAPI and OpenAPI are optional integrations for teams that already have those contracts. ## A minimal FlowDSL flow ```yaml flowdsl: "1.0" info: title: Order Notification version: "1.0.0" nodes: OrderReceived: operationId: receive_order kind: source summary: Receives new order events NotifyCustomer: operationId: notify_customer kind: action summary: Sends a confirmation email edges: - from: OrderReceived to: NotifyCustomer delivery: mode: durable packet: OrderPayload components: packets: OrderPayload: type: object properties: orderId: type: string customerId: type: string total: type: number required: [orderId, customerId, total] ``` This is a complete FlowDSL document. It declares two nodes with their roles, one edge connecting them, and the packet schema that flows between them. The runtime reads this and wires everything together — you never write Kafka consumer code or MongoDB retry logic by hand. ## Quick start ```bash # Clone the examples repository git clone https://github.com/flowdsl/examples # Start all infrastructure (MongoDB, Redis, Kafka, Studio) cd examples && make up-infra # Open FlowDSL Studio open http://localhost:5173 ``` Load the **Order Fulfillment** example in Studio, click **Validate**, and explore your first flow. The [Getting Started tutorial](https://flowdsl.com/docs/tutorials/getting-started) walks through every step. ## The ecosystem FlowDSL is an open-source specification with a growing ecosystem of commercial tools: | Product | What it is | | ----------------- | -------------------------------------------------------------------- | | **flowdsl.com** | Open source specification, Studio, SDKs, reference runtime | | **Node Catalog** | Community and premium node marketplace *(coming soon)* | | **Cloud Service** | Managed workflow hosting — deploy and run flows *(coming soon)* | All specification development happens in the open at [github.com/flowdsl](https://github.com/flowdsl){rel=""nofollow""}. Apache 2.0 licensed. ## Next steps - [What is FlowDSL?](https://flowdsl.com/docs/concepts/what-is-flowdsl) — understand the spec from first principles - [Getting Started](https://flowdsl.com/docs/tutorials/getting-started) — run a flow in 5 minutes - [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) — the most important concept in FlowDSL - [GitHub](https://github.com/flowdsl){rel=""nofollow""} — source code, issues, discussions ## AI Integration FlowDSL docs are AI-native. Connect your IDE to the FlowDSL MCP server for real-time documentation access: ::code-group ```bash [Claude Code] claude mcp add --transport http flowdsl-docs https://flowdsl.com/mcp ``` ```json [Cursor (.cursor/mcp.json)] { "mcpServers": { "flowdsl-docs": { "type": "http", "url": "https://flowdsl.com/mcp" } } } ``` ```json [VS Code (.vscode/mcp.json)] { "servers": { "flowdsl-docs": { "type": "http", "url": "https://flowdsl.com/mcp" } } } ``` :: # Migration This section covers breaking changes between FlowDSL specification versions and how to migrate your flows and node implementations. ## Versioning policy FlowDSL follows semantic versioning for the specification: - **Patch** releases (1.0.x) — bug fixes in the schema, documentation corrections. No migration required. - **Minor** releases (1.x.0) — additive, backward-compatible changes. Existing flows continue to work. - **Major** releases (x.0.0) — breaking changes. Migration guide published. SDK packages (`flowdsl-go`, `flowdsl-py`, `@flowdsl/sdk`) are versioned independently but document which spec version they implement. ## Checking your spec version Every FlowDSL document declares its schema version: ```yaml flowdsl: "1.0" $schema: "https://flowdsl.com/schemas/v1/flowdsl.schema.json" ``` Run `flowdsl validate` to check compatibility with the latest schema: ```bash flowdsl validate my-flow.flowdsl.yaml ``` ## Current version: 1.0 The initial stable release. No migration required from pre-1.0 drafts — the draft format was not publicly supported. ### What 1.0 established - Core node kinds: `trigger`, `transform`, `router`, `enricher`, `validator`, `aggregator`, `emitter`, `delay`, `sink` - Five delivery modes: `direct`, `ephemeral`, `checkpoint`, `durable`, `stream` - `components.events`, `components.packets`, `components.policies` as first-class - Edge-level delivery policy (not node-level) - `operationId` values in `snake_case`, component names in `PascalCase` - `x-ui` extension fields for canvas layout - `.flowdsl.yaml` and `.flowdsl.json` file extensions ## Planned: 1.1 The 1.1 minor release is planned to add: - **`components.schemas`** — reusable JSON Schema fragments shareable across packets and events - **`node.timeout`** — per-node timeout declaration (currently only on the retry policy) - **`edge.priority`** — optional integer priority hint for queue-backed modes - **Flow-level `metadata`** block — arbitrary key/value metadata attached to the flow document None of these are breaking changes. Flows written for 1.0 will validate and run unchanged on 1.1. ## SDK migration ### Go SDK The Go SDK (`github.com/flowdsl/flowdsl-go`) follows its own semver. The `NodeHandler` interface is considered stable. Breaking changes will be announced in the SDK changelog and in the GitHub releases page. To update: ```bash go get github.com/flowdsl/flowdsl-go@latest ``` ### Python SDK ```bash pip install --upgrade flowdsl-py ``` Check the changelog at [github.com/flowdsl/flowdsl-py](https://github.com/flowdsl/flowdsl-py/releases){rel=""nofollow""}. ### JavaScript SDK ```bash npm update @flowdsl/sdk ``` ## CLI migration When the CLI validates your flow, it reports the schema version mismatch if any: ```text ⚠ my-flow.flowdsl.yaml uses flowdsl: "1.0" but CLI targets schema 1.1 All 1.0 documents are valid under 1.1. No changes required. ``` ## Getting help with migration - Open a [GitHub discussion](https://github.com/flowdsl/spec/discussions){rel=""nofollow""} for migration questions - Check the [spec changelog](https://github.com/flowdsl/spec/blob/main/CHANGELOG.md){rel=""nofollow""} for detailed diff between versions - [Community Discord](https://discord.gg/MUjXSwGbUY){rel=""nofollow""} — `#migration` channel ## Next steps - [Community](https://flowdsl.com/docs/community) — contribute, report issues, ask questions - [CLI Tools](https://flowdsl.com/docs/tools/cli) — use `flowdsl validate` to check your flows # Communication Protocols FlowDSL is a language for describing communication between nodes. A node declares the protocols it supports via `runtime.supports` (an array). The specific protocol used for a connection is selected on the **edge** via the `protocol` field. ## Supported protocols | Protocol | Type | Latency | Throughput | Streaming | Broker required | Best for | | -------------- | -------------------- | ------- | ---------- | ------------- | ----------------- | ---------------------------------------- | | **In-Process** | Function call | \~µs | Highest | N/A | No | Same-language transforms | | **gRPC** | RPC | \~ms | Very high | Bidirectional | No | Cross-language commands, streaming | | **HTTP** | RPC | \~ms | High | No (polling) | No | Legacy nodes, simple integrations | | **NATS** | Pub/Sub + RPC | \~ms | Very high | JetStream | Yes (lightweight) | Events, service mesh, request/reply | | **Kafka** | Streaming | \~10ms | Highest | Continuous | Yes | Data pipelines, audit logs, fan-out | | **Redis** | Pub/Sub | \~ms | High | Pub/Sub | Yes | Burst smoothing, real-time notifications | | **ZeroMQ** | Brokerless messaging | \~µs | Very high | Patterns | No | High-perf local messaging, IoT | | **RabbitMQ** | Message queue | \~ms | High | No | Yes | Workflow routing, dead-letter queues | | **WebSocket** | Bidirectional stream | \~ms | High | Full-duplex | No | Browser clients, real-time dashboards | ## How to choose ```text Is the node in the same process? → proc Need cross-language RPC? → gRPC (default, recommended) Need pub/sub with request/reply? → NATS Need durable stream processing / event sourcing? → Kafka Need burst smoothing / ephemeral messaging? → Redis Pub/Sub Need high-perf brokerless messaging? → ZeroMQ Need advanced routing / dead-letter queues? → RabbitMQ Need browser-facing real-time updates? → WebSocket Legacy system with HTTP-only API? → HTTP (not recommended for new nodes) ``` ### Strategic guidance | Use case | Recommended protocol | | -------------------------------- | -------------------- | | Commands (request/response) | gRPC | | Events (fire-and-forget) | NATS, RabbitMQ | | Data pipelines (high-throughput) | Kafka | | Real-time notifications | Redis, WebSocket | | IoT / embedded | ZeroMQ | | Browser integration | WebSocket | --- ## In-Process No network call. The runtime invokes the node as a direct function call within the same process. ```json { "runtime": { "supports": ["proc"] } } ``` No configuration needed. This is automatically selected for `direct` delivery mode when source and target share the same language runtime. --- ## gRPC **Default protocol.** Binary serialization via Protobuf, native bidirectional streaming, automatic code generation. ```json { "runtime": { "supports": ["grpc"], "grpc": { "port": 50051, "streaming": true, "maxConcurrentStreams": 100, "tls": true } } } ``` ### Configuration | Field | Type | Default | Description | | --------------------------- | ------- | ------- | -------------------------------------- | | `grpc.port` | integer | `50051` | gRPC listen port | | `grpc.streaming` | boolean | `false` | Enable `InvokeStream` server-streaming | | `grpc.maxConcurrentStreams` | integer | — | Max concurrent gRPC streams | | `grpc.tls` | boolean | — | Require TLS for connections | ### NodeService contract ```protobuf service NodeService { rpc Invoke (InvokeRequest) returns (InvokeResponse); rpc InvokeStream (InvokeRequest) returns (stream InvokeResponse); rpc Health (Empty) returns (HealthResponse); rpc Manifest (Empty) returns (ManifestResponse); } ``` | RPC | Description | | -------------- | -------------------------------------------------- | | `Invoke` | Unary request/response | | `InvokeStream` | Server-streaming for LLM and long-running nodes | | `Health` | Readiness check (`SERVING` / `NOT_SERVING`) | | `Manifest` | Auto-registration — returns the full node manifest | ### Port conventions | Language | Default port | Environment variable | | ---------- | ------------ | -------------------- | | Go | 50051 | `FLOWDSL_GRPC_PORT` | | Python | 50052 | `FLOWDSL_GRPC_PORT` | | JavaScript | 50053 | `FLOWDSL_GRPC_PORT` | ### TLS Enable TLS by setting `grpc.tls: true`. The runtime reads certificate/key from `FLOWDSL_TLS_CERT` and `FLOWDSL_TLS_KEY` environment variables. --- ## HTTP REST/JSON over HTTP. Supported for legacy integrations but **not recommended** for new nodes. ```json { "runtime": { "supports": ["http"] } } ``` HTTP nodes expose a `POST /invoke` endpoint. The runtime sends JSON-serialized packets and expects a JSON response. No streaming support. --- ## NATS Lightweight, high-performance messaging with pub/sub, request/reply, and queue groups. ```json { "runtime": { "supports": ["nats"], "nats": { "url": "nats://localhost:4222", "subject": "flowdsl.nodes.my_node", "queueGroup": "workers" } } } ``` ### Configuration | Field | Type | Default | Description | | ----------------- | ------------ | ------- | ---------------------------------------------------- | | `nats.url` | string (uri) | — | NATS server URL | | `nats.subject` | string | — | NATS subject to subscribe/publish on | | `nats.queueGroup` | string | — | Queue group for load balancing across node instances | ### When to use - Service mesh communication with request/reply semantics - Event distribution where message ordering per subject is sufficient - Lightweight pub/sub without the overhead of Kafka - Microservice discovery via subjects --- ## Kafka Durable stream processing with consumer groups, partitioning, and exactly-once semantics. ```json { "runtime": { "supports": ["kafka"] } } ``` Kafka transport is also used by the `stream` delivery mode. When a node supports `kafka`, it means the node natively consumes from or produces to Kafka topics. The delivery mode's Kafka usage is configured separately on edges. ### When to use - High-throughput data pipelines (100K+ msg/sec) - Event sourcing and audit logging - Fan-out to multiple consumer groups - Stream processing with replay capability --- ## Redis Pub/Sub Fast publish/subscribe over Redis. No message persistence — subscribers must be connected to receive messages. ```json { "runtime": { "supports": ["redis"], "redis": { "url": "redis://localhost:6379", "channel": "flowdsl.nodes.my_node" } } } ``` ### Configuration | Field | Type | Default | Description | | --------------- | ------------ | ------- | ---------------------------------------- | | `redis.url` | string (uri) | — | Redis server URL | | `redis.channel` | string | — | Redis channel or pattern to subscribe to | ### When to use - Real-time notifications where message loss is acceptable - Cache invalidation events - Burst smoothing (also used by `ephemeral` delivery mode) - Simple pub/sub without dedicated message broker infrastructure --- ## ZeroMQ Brokerless, low-latency messaging library. Runs peer-to-peer without a central broker. ```json { "runtime": { "supports": ["zeromq"], "zeromq": { "address": "tcp://localhost:5555", "pattern": "pushPull" } } } ``` ### Configuration | Field | Type | Default | Description | | ---------------- | ------ | ------- | ---------------------------------------------------- | | `zeromq.address` | string | — | ZeroMQ bind/connect address | | `zeromq.pattern` | string | — | Messaging pattern: `pubSub`, `pushPull`, or `reqRep` | ### Patterns | Pattern | Description | | ---------- | ------------------------------- | | `pubSub` | One-to-many fan-out | | `pushPull` | Load-balanced work distribution | | `reqRep` | Synchronous request/reply | ### When to use - Ultra-low-latency messaging (\~µs) - IoT and embedded systems - High-frequency data distribution without broker overhead - In-datacenter node communication --- ## RabbitMQ Full-featured message broker with exchanges, routing keys, and dead-letter queues. ```json { "runtime": { "supports": ["rabbitmq"], "rabbitmq": { "url": "amqp://localhost:5672", "exchange": "flowdsl.nodes", "routingKey": "my_node.invoke", "queue": "my_node_tasks" } } } ``` ### Configuration | Field | Type | Default | Description | | --------------------- | ------------ | ------- | ------------------------------- | | `rabbitmq.url` | string (uri) | — | AMQP connection URL | | `rabbitmq.exchange` | string | — | Exchange name | | `rabbitmq.routingKey` | string | — | Routing key for message routing | | `rabbitmq.queue` | string | — | Queue name for consuming | ### When to use - Complex routing logic (topic exchanges, headers routing) - Dead-letter queues for failed message handling - Priority queues - Workflows requiring message acknowledgement and redelivery --- ## WebSocket Full-duplex bidirectional communication over a single TCP connection. ```json { "runtime": { "supports": ["websocket"], "websocket": { "url": "ws://localhost:8080", "path": "/nodes/my_node" } } } ``` ### Configuration | Field | Type | Default | Description | | ---------------- | ------------ | ------- | ----------------------- | | `websocket.url` | string (uri) | — | WebSocket server URL | | `websocket.path` | string | — | WebSocket endpoint path | ### When to use - Browser-facing real-time dashboards - Live data feeds to frontend clients - Nodes that need persistent bidirectional connections - Integration with WebSocket-only external services --- ## Protocol resolution The runtime resolves the actual transport at deploy time based on the delivery mode and node configuration: | Delivery mode | Transport resolution | | -------------------- | -------------------------- | | `direct` (same lang) | In-process function call | | `direct` (diff lang) | gRPC (transparent upgrade) | | `ephemeral` | Redis | | `checkpoint` | MongoDB | | `durable` | MongoDB | | `stream` | Kafka | The `supports` field on a node defines which **communication protocols** the node can use. The flow author selects a specific protocol on each edge via the `protocol` field. The `delivery.mode` on an edge defines the **delivery semantics** — how packets flow between nodes including durability guarantees. ## Next steps - [Delivery Modes](https://flowdsl.com/docs/delivery-modes) — how delivery modes interact with transport - [Node Manifest Reference](https://flowdsl.com/docs/reference/node-manifest) — manifest fields including all transport configs - [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) — full tutorial with gRPC setup - [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — Python tutorial - [Docker Compose Local](https://flowdsl.com/docs/tutorials/docker-compose-local) — running the full stack locally # Reference Complete specification reference. Use this section when you need the exact type, constraints, and behavior of a specific field. ## Spec reference | Page | What it covers | | ---------------------------------------------------------------------------- | ---------------------------------------------------------------------------- | | [FlowDSL Document](https://flowdsl.com/docs/reference/spec/flowdsl-document) | Top-level document fields: `flowdsl`, `info`, `nodes`, `edges`, `components` | | [Flow object](https://flowdsl.com/docs/reference/spec/flow) | Flow-level fields and lifecycle | | [Node object](https://flowdsl.com/docs/reference/spec/node) | Node fields: `operationId`, `kind`, `inputs`, `outputs`, `settings`, `x-ui` | | [Edge object](https://flowdsl.com/docs/reference/spec/edge) | Edge fields: `from`, `to`, `delivery`, `when` | | [DeliveryPolicy](https://flowdsl.com/docs/reference/spec/delivery-policy) | All delivery policy fields by mode | | [RetryPolicy](https://flowdsl.com/docs/reference/spec/retry-policy) | Retry policy fields and backoff strategies | | [Components](https://flowdsl.com/docs/reference/spec/components) | `packets`, `events`, `policies`, `nodes` | | [Packets](https://flowdsl.com/docs/reference/spec/packets) | Packet schema format and reference syntax | | [Runtime Bindings](https://flowdsl.com/docs/reference/spec/runtime-bindings) | `x-runtime` extension for infrastructure binding | | [Extensions (x-\*)](https://flowdsl.com/docs/reference/spec/extensions) | All supported extension fields | ## Node infrastructure | Page | What it covers | | --------------------------------------------------------------------------- | ------------------------------------------------------------------------------------- | | [Node Manifest](https://flowdsl.com/docs/reference/node-manifest) | `flowdsl-node.json` format reference | | [Communication Protocols](https://flowdsl.com/docs/reference/grpc-protocol) | All 9 communication protocols — gRPC, NATS, Kafka, Redis, ZeroMQ, RabbitMQ, WebSocket | | [Node Registry API](https://flowdsl.com/docs/reference/node-registry-api) | `repo.flowdsl.com` REST API reference | ## Schema The canonical FlowDSL JSON Schema is available at: ```text https://flowdsl.com/schemas/v1/flowdsl.schema.json ``` Validate a document: ```bash npx ajv-cli validate \ -s https://flowdsl.com/schemas/v1/flowdsl.schema.json \ -d my-flow.flowdsl.yaml ``` # Node Manifest Reference A **Node Manifest** is a `.flowdsl-node.json` file that describes a single installable node in the [repo.flowdsl.com](https://repo.flowdsl.com){rel=""nofollow""} registry. It captures the node's identity, runtime requirements, typed port contracts, and the settings schema used to render configuration forms in FlowDSL Studio. **Schema:** `https://flowdsl.com/schemas/v1/flowdsl-node.schema.json` --- ## File format Node manifests use the `.flowdsl-node.json` extension and validate against the `flowdsl-node.schema.json` schema (JSON Schema Draft-07). ```json { "id": "flowdsl/email-fetcher", "name": "Email Fetcher", "version": "1.0.0", "summary": "Polls an IMAP or POP3 mailbox and emits one event per received email.", "kind": "source", "language": "python", "author": { "name": "FlowDSL Team", "url": "https://flowdsl.com" }, "license": "Apache-2.0", "runtime": { "handler": "flowdsl.nodes.email.EmailFetcherNode", "supports": ["proc"] }, "outputs": [ ... ], "settingsSchema": { ... }, "published": true, "publishedAt": "2026-01-15T10:00:00Z" } ``` --- ## Top-level fields | Field | Type | Required | Description | | ------------------- | ------------------ | -------- | --------------------------------------------------------------------------------------------------------------- | | `id` | string | yes | Unique registry identifier. Format: `/`. e.g. `flowdsl/email-fetcher` | | `name` | string | yes | Human-readable display name shown in Studio and the marketplace. | | `version` | string | yes | Semver version of this manifest. | | `summary` | string | yes | One-line description shown in search results and Studio tooltips. | | `description` | string | no | Full markdown description rendered on the registry detail page. | | `kind` | enum | yes | Functional category. See [Node kinds](https://flowdsl.com/#node-kinds). | | `language` | enum | yes | Implementation language: `go`, `python`, or `nodejs`. | | `author` | object | yes | Node author. See [Author](https://flowdsl.com/#author). | | `license` | string | yes | SPDX license identifier, e.g. `Apache-2.0`. | | `repoUrl` | string (URI) | no | Source code repository URL. | | `docsUrl` | string (URI) | no | Documentation page URL. | | `icon` | string | no | Emoji or icon name displayed in Studio. | | `color` | string | no | Hex color for the Studio node card, e.g. `#4F46E5`. | | `tags` | string [] | no | Search and filter tags for the registry. | | `runtime` | object | yes | Runtime configuration. See [Runtime](https://flowdsl.com/#runtime). | | `inputs` | NodePort [] | no | Named input ports. See [Ports](https://flowdsl.com/#ports). | | `outputs` | NodePort [] | no | Named output ports. See [Ports](https://flowdsl.com/#ports). | | `settingsSchema` | object | no | JSON Schema object driving the Studio settings form. See [settingsSchema](https://flowdsl.com/#settingsschema). | | `dependencies` | string [] | no | Other node IDs required at runtime. | | `minRuntimeVersion` | string | no | Minimum FlowDSL runtime version required. | | `published` | boolean | yes | Whether the node is visible in the registry. | | `publishedAt` | string (date-time) | no | ISO 8601 timestamp when this version was published. | ## Port object Input and output ports are described as objects in the `inputs` and `outputs` arrays: | Field | Type | Required | Description | | ------------- | ------- | -------- | ---------------------------------------------------------------- | | `name` | string | Yes | Port name (matches port name in the flow document) | | `packet` | string | No | Packet type reference | | `description` | string | No | Description of this port | | `required` | boolean | No | Whether this port must have an incoming packet (default: `true`) | ## Supported protocols The `runtime` field in a FlowDSL document’s node definition includes a `supports` array listing which communication protocols the node can use. The specific protocol for a connection is selected on the **edge** via the `protocol` field. | Field | Type | Default | Description | | ------------------ | --------- | ---------- | -------------------------------------------------------------------------------------------------------------------------------------- | | `runtime.supports` | string [] | `["grpc"]` | Protocols this node supports: `"proc"`, `"grpc"`, `"http"`, `"nats"`, `"kafka"`, `"redis"`, `"zeromq"`, `"rabbitmq"`, or `"websocket"` | ### gRPC config | Field | Type | Default | Description | | ----------------------------------- | ------- | ------- | ---------------------------------------- | | `runtime.grpc.port` | integer | `50051` | gRPC listen port | | `runtime.grpc.streaming` | boolean | `false` | Whether the node supports `InvokeStream` | | `runtime.grpc.maxConcurrentStreams` | integer | — | Max concurrent gRPC streams | | `runtime.grpc.tls` | boolean | — | Whether TLS is required | ### NATS config | Field | Type | Default | Description | | ------------------------- | ------------ | ------- | ------------------------------------ | | `runtime.nats.url` | string (uri) | — | NATS server URL | | `runtime.nats.subject` | string | — | NATS subject to subscribe/publish on | | `runtime.nats.queueGroup` | string | — | Queue group for load balancing | ### Redis config | Field | Type | Default | Description | | ----------------------- | ------------ | ------- | ------------------------ | | `runtime.redis.url` | string (uri) | — | Redis server URL | | `runtime.redis.channel` | string | — | Redis channel or pattern | ### ZeroMQ config | Field | Type | Default | Description | | ------------------------ | ------ | ------- | --------------------------------------- | | `runtime.zeromq.address` | string | — | ZeroMQ bind/connect address | | `runtime.zeromq.pattern` | string | — | `"pubSub"`, `"pushPull"`, or `"reqRep"` | ### RabbitMQ config | Field | Type | Default | Description | | ----------------------------- | ------------ | ------- | ------------------- | | `runtime.rabbitmq.url` | string (uri) | — | AMQP connection URL | | `runtime.rabbitmq.exchange` | string | — | Exchange name | | `runtime.rabbitmq.routingKey` | string | — | Routing key | | `runtime.rabbitmq.queue` | string | — | Queue name | ### WebSocket config | Field | Type | Default | Description | | ------------------------ | ------------ | ------- | ----------------------- | | `runtime.websocket.url` | string (uri) | — | WebSocket server URL | | `runtime.websocket.path` | string | — | WebSocket endpoint path | The top-level `grpcPort` field in a node manifest is a convenient shorthand — equivalent to setting `runtime.grpc.port`. See [Communication Protocols](https://flowdsl.com/docs/reference/grpc-protocol) for full protocol details and usage guidance. ## Complete example ```json { "operationId": "llm_classify_email", "name": "LLM Email Classifier", "version": "2.3.1", "description": "Classifies support emails as urgent, normal, or spam using a language model. Returns a classification with confidence score and reasoning.", "runtime": "python", "inputs": [ { "name": "in", "packet": "EmailPayload", "description": "The email to classify", "required": true } ], "outputs": [ { "name": "out", "packet": "AnalysisResult", "description": "Classification result with confidence score" } ], "settings": { "type": "object", "properties": { "model": { "type": "string", "default": "gpt-4o-mini", "description": "LLM model to use for classification", "enum": ["gpt-4o", "gpt-4o-mini", "claude-3-5-sonnet-20241022"] }, "temperature": { "type": "number", "default": 0.1, "minimum": 0, "maximum": 2, "description": "Model temperature. Lower = more deterministic." }, "systemPrompt": { "type": "string", "description": "Custom system prompt. Uses a carefully tuned default if omitted." }, "maxTokens": { "type": "integer", "default": 500, "minimum": 100, "maximum": 4000 } } }, "repository": "https://github.com/myorg/flowdsl-nodes", "author": "My Team", "email": "platform@myorg.com", "license": "Apache-2.0", "tags": ["llm", "email", "classification", "nlp", "support"], "minRuntimeVersion": "1.0.0" } ``` ## Versioning Node versions follow semver: | Change type | Version bump | Example | | ------------------------------------------------- | ------------ | ------------- | | Bug fix, no contract change | Patch | 2.3.0 → 2.3.1 | | New optional input/output port | Minor | 2.3.0 → 2.4.0 | | Renamed port, removed output, changed packet type | Major | 2.3.0 → 3.0.0 | Before bumping major versions, update all FlowDSL documents that reference this `operationId`. ## settings schema The `settings` field is a JSON Schema Draft-07 object describing the node's static configuration. The runtime validates the `settings` provided in the FlowDSL node definition against this schema at startup. Provide defaults for all optional settings so the node works correctly when settings are omitted. ## Next steps - [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) — implementing a node and writing its manifest - [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — same for Python - [Node Development guide](https://flowdsl.com/docs/guides/node-development) — full development lifecycle # Node Registry API Reference The node registry at `repo.flowdsl.com` provides a REST API for publishing and discovering FlowDSL nodes. The registry is currently in development — the API design below reflects the planned v1 surface. ::callout{type="info"} The Node Registry is coming soon. This page documents the planned API for early adopters building tooling. :: ## Base URL ```text https://repo.flowdsl.com/api/v1 ``` ## Authentication Authenticated endpoints require a bearer token: ```bash Authorization: Bearer ``` Obtain a token at [flowdsl.com/settings/tokens](https://flowdsl.com/settings/tokens){rel=""nofollow""}. ## Endpoints ### `GET /nodes` List all published nodes. Supports pagination and filtering. **Query parameters:** | Parameter | Type | Description | | --------- | ------- | ----------------------------------------------- | | `q` | string | Full-text search query | | `tag` | string | Filter by tag (repeatable) | | `runtime` | string | Filter by runtime: `go`, `python`, `javascript` | | `page` | integer | Page number (default: 1) | | `perPage` | integer | Results per page (default: 20, max: 100) | **Example:** ```bash curl https://repo.flowdsl.com/api/v1/nodes?q=llm+classification&tag=email ``` **Response:** ```json { "nodes": [ { "operationId": "llm_classify_email", "name": "LLM Email Classifier", "version": "2.3.1", "description": "Classifies support emails using a language model", "runtime": "python", "author": "My Team", "tags": ["llm", "email", "classification"], "downloads": 1234, "publishedAt": "2026-01-15T10:00:00Z" } ], "total": 1, "page": 1, "perPage": 20 } ``` --- ### `GET /nodes/{operationId}` Get details for a specific node. **Example:** ```bash curl https://repo.flowdsl.com/api/v1/nodes/llm_classify_email ``` **Response:** ```json { "operationId": "llm_classify_email", "name": "LLM Email Classifier", "latestVersion": "2.3.1", "description": "Classifies support emails as urgent, normal, or spam", "runtime": "python", "repository": "https://github.com/myorg/flowdsl-nodes", "author": "My Team", "license": "Apache-2.0", "tags": ["llm", "email", "classification"], "manifest": { "...": "full flowdsl-node.json content" }, "downloads": 1234, "publishedAt": "2026-01-15T10:00:00Z", "updatedAt": "2026-03-01T14:30:00Z" } ``` --- ### `GET /nodes/{operationId}/versions` List all published versions of a node. **Response:** ```json { "versions": [ { "version": "2.3.1", "publishedAt": "2026-03-01T14:30:00Z", "changelog": "Fixed rate limit handling" }, { "version": "2.3.0", "publishedAt": "2026-02-15T10:00:00Z", "changelog": "Added Claude support" }, { "version": "2.2.0", "publishedAt": "2026-01-20T09:00:00Z", "changelog": "New optional systemPrompt setting" } ] } ``` --- ### `POST /nodes` (authenticated) Publish a new node version. **Request:** ```bash curl -X POST https://repo.flowdsl.com/api/v1/nodes \ -H "Authorization: Bearer " \ -H "Content-Type: application/json" \ -d @flowdsl-node.json ``` **Response:** ```json { "operationId": "llm_classify_email", "version": "2.3.1", "status": "published", "publishedAt": "2026-03-28T10:00:00Z" } ``` --- ### `GET /search` Full-text search across node names, descriptions, and tags. ```bash curl "https://repo.flowdsl.com/api/v1/search?q=twilio+sms&runtime=go" ``` ## CLI usage ```bash # Authenticate flowdsl auth login # Publish current node flowdsl publish --manifest flowdsl-node.json # Search the registry flowdsl search "llm classification" # Install a node (adds to node-registry.yaml) flowdsl install llm_classify_email@2.3.1 ``` ## Next steps - [Node Manifest reference](https://flowdsl.com/docs/reference/node-manifest) — `flowdsl-node.json` format - [Node Development guide](https://flowdsl.com/docs/guides/node-development) — how to build and publish nodes # Components Reference The `components` section holds reusable definitions that can be referenced from the main flow document. It keeps the `nodes` and `edges` sections clean and avoids duplicating schema definitions. ## `components` fields | Field | Type | Description | | ---------- | ------ | ------------------------------------------------------------------ | | `packets` | object | Map of `PacketName` → JSON Schema object | | `events` | object | Map of `EventName` → event schema | | `policies` | object | Map of `PolicyName` → DeliveryPolicy (reusable delivery templates) | | `nodes` | object | Map of `NodeName` → Node definition (shared node templates) | ## `components.packets` Define reusable packet schemas: ```yaml components: packets: OrderPayload: type: object properties: orderId: { type: string } customerId: { type: string } total: { type: number } currency: type: string enum: [USD, EUR, GBP] required: [orderId, customerId, total, currency] PaymentResult: type: object properties: orderId: { type: string } chargeId: { type: string } status: type: string enum: [succeeded, failed] required: [orderId, chargeId, status] ``` Reference from edges: ```yaml edges: - from: ValidateOrder to: ChargePayment delivery: mode: durable packet: OrderPayload # References components.packets.OrderPayload ``` Packets use JSON Schema Draft-07 and support `$ref` within the `components.packets` namespace: ```yaml components: packets: Address: type: object properties: street: { type: string } city: { type: string } country: { type: string } Customer: type: object properties: id: { type: string } address: $ref: "#/components/packets/Address" ``` ## `components.policies` Reusable delivery policy templates — define once, reference from multiple edges: ```yaml components: policies: StandardDurable: mode: durable retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S maxDelay: PT60S jitter: true LlmDurable: mode: durable retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S maxDelay: PT120S retryOn: [RATE_LIMITED, TIMEOUT] ``` Reference from edges: ```yaml edges: - from: ClassifyEmail to: SendSms delivery: $ref: "#/components/policies/StandardDurable" packet: AlertPayload idempotencyKey: "{{payload.messageId}}-sms" ``` ## `components.events` Native event definitions (distinct from packets — events are published to the event bus): ```yaml components: events: OrderProcessed: type: object properties: orderId: { type: string } processedAt: { type: string, format: date-time } required: [orderId, processedAt] ``` ## `components.nodes` Reusable node templates for common patterns: ```yaml components: nodes: StandardLlmNode: kind: llm settings: model: gpt-4o-mini temperature: 0.1 maxTokens: 500 ``` ## Naming convention All component names use `PascalCase`: `OrderPayload`, `StandardDurable`, `LlmEmailClassifier`. ## Next steps - [Packets reference](https://flowdsl.com/docs/reference/spec/packets) — packet schema format details - [DeliveryPolicy reference](https://flowdsl.com/docs/reference/spec/delivery-policy) — delivery policy fields - [FlowDSL Document reference](https://flowdsl.com/docs/reference/spec/flowdsl-document) — top-level document # DeliveryPolicy Reference The delivery policy object appears on every edge in a FlowDSL document. It governs how packets travel from the source node to the destination node. ## All fields | Field | Type | Required | Applies to | Description | | -------------------- | ----------------- | -------- | ---------------------- | ----------------------------------------------------------------------- | | `mode` | string | Yes | All | Delivery mode: `direct`, `ephemeral`, `checkpoint`, `durable`, `stream` | | `packet` | string | No | All | Packet type reference: `"PacketName"` or `"asyncapi#/..."` | | `retryPolicy` | object | No | `durable`, `ephemeral` | Retry configuration | | `idempotencyKey` | string | No | `durable` | Template for deduplication key | | `deadLetterQueue` | string | No | `durable`, `ephemeral` | Named dead letter queue | | `timeout` | string (ISO 8601) | No | `durable`, `ephemeral` | Delivery timeout | | `priority` | integer (1–10) | No | `durable`, `ephemeral` | Delivery priority | | `batchSize` | integer | No | `checkpoint` | Packets to accumulate before checkpoint write | | `checkpointInterval` | integer | No | `checkpoint` | Checkpoint every N packets | | `topic` | string | No | `stream` | Kafka topic name | | `consumerGroup` | string | No | `stream` | Kafka consumer group | | `partitionKey` | string | No | `stream` | Template for Kafka partition key | | `stream` | string | No | `ephemeral` | Redis stream name | | `maxLen` | integer | No | `ephemeral` | Redis stream max length | ## Conditional field requirements | Field | Required for | Notes | | ---------------- | ------------ | ---------------------------------------------------- | | `mode` | All modes | Always required | | `topic` | `stream` | Required — Kafka topic | | `stream` | `ephemeral` | Required — Redis stream name | | `idempotencyKey` | — | Strongly recommended for `durable` with side effects | | `batchSize` | — | Default 1 for `checkpoint` | ## `direct` example ```yaml delivery: mode: direct packet: ParsedEvent ``` No additional fields are relevant for `direct` mode. Delivery is in-process and synchronous. ## `ephemeral` example ```yaml delivery: mode: ephemeral packet: EnrichmentInput stream: enrichment-queue maxLen: 200000 retryPolicy: maxAttempts: 2 backoff: fixed initialDelay: PT1S timeout: PT30S priority: 5 ``` ## `checkpoint` example ```yaml delivery: mode: checkpoint packet: DocumentChunks batchSize: 50 checkpointInterval: 1000 ``` ## `durable` example ```yaml delivery: mode: durable packet: OrderPayload idempotencyKey: "{{payload.orderId}}-charge" deadLetterQueue: payment-failures timeout: PT60S retryPolicy: maxAttempts: 5 backoff: exponential initialDelay: PT2S maxDelay: PT120S jitter: true retryOn: [TIMEOUT, RATE_LIMITED, TEMPORARY] priority: 8 ``` ## `stream` example ```yaml delivery: mode: stream packet: OrderProcessed topic: orders.processed consumerGroup: fulfillment-workers partitionKey: "{{payload.customerId}}" ``` The `partitionKey` template ensures all events for the same customer are processed in order by the same Kafka partition. ## Field details ### `idempotencyKey` A Go template string evaluated against each packet. The result is stored in MongoDB to prevent duplicate processing. ```yaml idempotencyKey: "{{payload.orderId}}-{{operationId}}" ``` Available template variables: - `{{payload.*}}` — any field from the packet payload - `{{operationId}}` — the destination node's operationId - `{{flowId}}` — the current flow ID Must be unique per logical operation. Reusing keys across different operations causes incorrect deduplication. ### `timeout` ISO 8601 duration. If the node does not acknowledge the packet within this duration, the packet is considered failed and the retry policy is applied. | Duration | Meaning | | -------- | ---------- | | `PT30S` | 30 seconds | | `PT5M` | 5 minutes | | `PT1H` | 1 hour | ### `priority` Integer 1–10 (10 = highest). Higher priority packets are delivered first when the queue has backlog. Default: 5. ### `deadLetterQueue` Name of the dead letter collection in MongoDB. Defaults to `{flowId}.dead_letters`. Use named queues to separate errors by type for different monitoring and recovery workflows. ## Next steps - [Delivery Modes concept](https://flowdsl.com/docs/concepts/delivery-modes) — behavior and guarantees - [RetryPolicy reference](https://flowdsl.com/docs/reference/spec/retry-policy) — retry field reference - [Edge reference](https://flowdsl.com/docs/reference/spec/edge) — the full edge object # Edge Object Reference Edges are declared as an array under the top-level `edges` key. Each edge connects a source node output to a destination node input and carries the delivery policy. ## Fields | Field | Type | Required | Description | | ---------- | -------------- | -------- | --------------------------------------------------- | | `from` | string | Yes | Source: `"NodeName"` or `"NodeName.outputPort"` | | `to` | string | Required | Destination: `"NodeName"` or `"NodeName.inputPort"` | | `delivery` | DeliveryPolicy | Yes | The delivery policy governing this connection | | `when` | string | No | Condition expression for conditional routing | ## `from` and `to` syntax | Syntax | Meaning | | --------------------- | ---------------------------------------------- | | `"NodeName"` | Any output/input port (when node has one port) | | `"NodeName.portName"` | Specific named port | Named port syntax is required when a node has multiple outputs (router nodes). ## Examples ### Simple edge ```yaml edges: - from: ParseJson to: ValidateSchema delivery: mode: direct packet: RawPayload ``` ### Named port edge (router) ```yaml edges: - from: PriorityRouter.urgent to: UrgentHandler delivery: mode: durable packet: EventPayload - from: PriorityRouter.normal to: NormalHandler delivery: mode: ephemeral packet: EventPayload ``` ### Conditional edge ```yaml edges: - from: ScoreLead to: AssignToSalesRep when: "payload.score >= 80" delivery: mode: durable packet: ScoredLead - from: ScoreLead to: AddToNurture when: "payload.score < 80" delivery: mode: durable packet: ScoredLead ``` The `when` expression uses a simple predicate syntax evaluated against the packet payload. Supported operators: `==`, `!=`, `>`, `>=`, `<`, `<=`, `&&`, `||`, `!`. ### Edge with retry and idempotency ```yaml edges: - from: PrepareInvoice to: SendInvoiceEmail delivery: mode: durable packet: InvoicePayload idempotencyKey: "{{payload.invoiceId}}-email" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT3S maxDelay: PT60S ``` ## Constraint: edges must form a DAG FlowDSL documents must be directed acyclic graphs — edges must not form cycles. The validator rejects documents with cycles. ## Next steps - [DeliveryPolicy reference](https://flowdsl.com/docs/reference/spec/delivery-policy) — delivery policy fields - [Edges concept](https://flowdsl.com/docs/concepts/edges) — conceptual explanation - [Node reference](https://flowdsl.com/docs/reference/spec/node) — source and destination nodes # Extensions (x-*) Reference FlowDSL follows the OpenAPI/AsyncAPI convention of allowing `x-` prefixed extension fields anywhere in the document. Extension fields are optional and ignored by implementations that don't recognize them. ## Supported extensions ### `x-ui` (on nodes) Canvas layout hints for FlowDSL Studio: ```yaml nodes: FilterByPriority: operationId: filter_by_priority kind: router x-ui: position: x: 420 y: 200 color: "#7c3aed" # Hex color for the node card icon: filter # Icon name from Studio's icon library ``` | Field | Type | Description | | ------------ | ------ | -------------------------------------------------------------------------------------- | | `position.x` | number | Canvas X coordinate | | `position.y` | number | Canvas Y coordinate | | `color` | string | Hex color for node card background tint | | `icon` | string | Icon name: `filter`, `sparkles`, `mail`, `bell`, `database`, `globe`, `cpu`, `archive` | ### `x-tags` (on nodes) Categorization tags for Studio filtering: ```yaml nodes: LlmClassifier: operationId: llm_classify kind: llm x-tags: [llm, classification, email] ``` ### `x-deprecated` (on nodes) Marks a node as deprecated. Studio shows a warning on deprecated nodes: ```yaml nodes: OldEmailSender: operationId: send_email_v1 kind: action x-deprecated: true description: "Deprecated. Use SendEmailV2 instead." ``` ### `x-owner` (on nodes) Team or person responsible for a node: ```yaml nodes: ChargePayment: operationId: charge_payment kind: action x-owner: team: payments-platform slack: "#payments-platform" oncall: payments-oncall@pagerduty.com ``` ### `x-runtime` (on edges) Infrastructure binding overrides. See [Runtime Bindings reference](https://flowdsl.com/docs/reference/spec/runtime-bindings) for details. ### `x-alerts` (on nodes) Dead letter alerting configuration: ```yaml nodes: ChargePayment: operationId: charge_payment kind: action x-alerts: onDeadLetter: webhook: https://hooks.slack.com/services/... message: "Payment failed for order {{packet.orderId}}" minSeverity: error ``` ## Custom extensions You can define your own `x-` extensions for your tooling. The runtime ignores unknown extensions: ```yaml nodes: ProcessOrder: operationId: process_order kind: action x-cost-center: "engineering-platform" x-sla-target-ms: 500 x-compliance: pci-dss ``` ## Extension conventions - Always prefix with `x-` - Use `kebab-case` for extension names: `x-owner`, not `x-Owner` or `x_owner` - Document your custom extensions in a companion schema or README ## Next steps - [Runtime Bindings](https://flowdsl.com/docs/reference/spec/runtime-bindings) — `x-runtime` details - [FlowDSL Document](https://flowdsl.com/docs/reference/spec/flowdsl-document) — top-level document fields # Flow Object Reference In FlowDSL 1.0, the flow graph is described directly at the document root — there is no separate "flow" wrapper object. The `nodes` and `edges` at the top level define the single flow in the document. Future versions may support multiple flows per document. For now, one document = one flow. ## Flow identity A flow's identity is derived from its document metadata: ```yaml info: title: Order Fulfillment version: "2.1.0" ``` When loaded by the runtime, the flow is addressable as `order_fulfillment` (title converted to `snake_case`) or by an explicit flow ID if configured. ## Flow lifecycle states | State | Description | | ---------- | -------------------------------------- | | `draft` | Document written, not yet validated | | `valid` | Passed schema and semantic validation | | `deployed` | Loaded by the runtime, nodes connected | | `active` | Processing events | | `paused` | Deployed but not accepting new events | | `archived` | Removed from the runtime | ## Source nodes Nodes with no incoming edges are source nodes — they are the entry points for the flow. A flow can have multiple source nodes: ```yaml nodes: OrderReceived: # source: no incoming edge operationId: receive_order kind: source ManualOrderEntry: # source: no incoming edge operationId: receive_manual_order kind: source ``` Both `OrderReceived` and `ManualOrderEntry` are entry points. The runtime starts a new execution context for each event arriving at either source. ## Terminal nodes Nodes with no outgoing edges are terminal nodes — they end the flow: ```yaml nodes: ArchiveLead: operationId: archive_lead kind: terminal # No outgoing edges needed SpamFolder: operationId: move_to_spam kind: terminal ``` A flow completes when all active execution paths reach a terminal node (or have no more outgoing edges). ## Execution contexts Each event that enters a flow through a source node creates an independent execution context. Execution contexts are isolated — they do not share state unless they write to a shared external system (database, Kafka topic). The runtime assigns each execution context a unique `executionId` for tracing. ## Next steps - [FlowDSL Document](https://flowdsl.com/docs/reference/spec/flowdsl-document) — top-level document fields - [Node reference](https://flowdsl.com/docs/reference/spec/node) — node object fields - [Flows concept](https://flowdsl.com/docs/concepts/flows) — conceptual explanation # FlowDSL Document Reference A FlowDSL document is a YAML or JSON file that describes an executable flow graph. This page covers every top-level field. ## Top-level fields | Field | Type | Required | Description | | -------------- | ------------------- | -------- | ----------------------------------------------------------------- | | `flowdsl` | string | Yes | Specification version. Currently `"1.0"`. | | `info` | Info object | Yes | Document metadata. | | `externalDocs` | ExternalDocs object | No | Links to related documentation. | | `asyncapi` | string | No | Path or URL to an AsyncAPI document for message references. | | `openapi` | string | No | Path or URL to an OpenAPI document for schema references. | | `nodes` | object | Yes | Map of `NodeName` → Node object. Node names must be `PascalCase`. | | `edges` | array | Yes | Array of Edge objects. | | `components` | Components object | No | Reusable packets, events, policies, and node templates. | ## `info` object | Field | Type | Required | Description | | ------------- | ------ | -------- | ------------------------------------------- | | `title` | string | Yes | Human-readable name for the flow. | | `version` | string | Yes | Flow document version (semver recommended). | | `description` | string | No | Longer description. Supports markdown. | | `contact` | object | No | `name`, `email`, `url` of the owning team. | | `license` | object | No | `name` and `url` of the flow's license. | ## Complete example ```yaml flowdsl: "1.0" info: title: Order Fulfillment version: "2.1.0" description: | Processes customer orders from receipt to shipment confirmation. Handles payment, inventory reservation, and customer notification. contact: name: Platform Team email: platform@mycompany.com license: name: Apache-2.0 url: https://www.apache.org/licenses/LICENSE-2.0 externalDocs: url: https://github.com/mycompany/event-schemas/blob/main/asyncapi.yaml description: AsyncAPI event schema definitions asyncapi: "./events.asyncapi.yaml" nodes: OrderReceived: operationId: receive_order kind: source outputs: out: { packet: OrderPayload } ValidateOrder: operationId: validate_order kind: transform inputs: in: { packet: OrderPayload } outputs: out: { packet: ValidatedOrder } ChargePayment: operationId: charge_payment kind: action inputs: in: { packet: ValidatedOrder } outputs: out: { packet: PaymentResult } edges: - from: OrderReceived to: ValidateOrder delivery: mode: direct packet: OrderPayload - from: ValidateOrder to: ChargePayment delivery: mode: durable packet: ValidatedOrder idempotencyKey: "{{payload.orderId}}-charge" components: packets: OrderPayload: type: object properties: orderId: { type: string } customerId: { type: string } total: { type: number } currency: { type: string } required: [orderId, customerId, total, currency] ``` ## JSON equivalent ```json { "flowdsl": "1.0", "info": { "title": "Order Fulfillment", "version": "2.1.0" }, "nodes": { "OrderReceived": { "operationId": "receive_order", "kind": "source", "outputs": { "out": { "packet": "OrderPayload" } } } }, "edges": [ { "from": "OrderReceived", "to": "ValidateOrder", "delivery": { "mode": "direct", "packet": "OrderPayload" } } ], "components": { "packets": { "OrderPayload": { "type": "object", "properties": { "orderId": { "type": "string" } } } } } } ``` ## Validation Validate any FlowDSL document against the JSON Schema: ```bash # Using ajv-cli npx ajv-cli validate \ -s https://flowdsl.com/schemas/v1/flowdsl.schema.json \ -d my-flow.flowdsl.yaml # Using the FlowDSL CLI flowdsl validate my-flow.flowdsl.yaml ``` ## Next steps - [Node reference](https://flowdsl.com/docs/reference/spec/node) — the Node object - [Edge reference](https://flowdsl.com/docs/reference/spec/edge) — the Edge object - [DeliveryPolicy reference](https://flowdsl.com/docs/reference/spec/delivery-policy) — delivery configuration # Node Object Reference Nodes are declared under the top-level `nodes` map, keyed by their PascalCase name. ## Fields | Field | Type | Required | Description | | ------------- | ------ | -------- | --------------------------------------------------------------------------------------------------------------- | | `operationId` | string | Yes | `snake_case` identifier matching the registered handler. Must be unique across all nodes in the flow. | | `kind` | string | Yes | Node role: `source`, `transform`, `router`, `llm`, `action`, `checkpoint`, `publish`, `terminal`, `integration` | | `summary` | string | No | One-line description. Shown in Studio and API responses. | | `description` | string | No | Longer description. Supports markdown. | | `inputs` | object | No | Map of port name → Port object. | | `outputs` | object | No | Map of port name → Port object. | | `settings` | object | No | Static configuration injected into the node handler at initialization. | | `x-ui` | object | No | Canvas layout hints for Studio. | ## Port object | Field | Type | Required | Description | | ------------- | ------ | -------- | --------------------------------------------------------------- | | `packet` | string | No | Reference to a packet type: `"PacketName"` or `"asyncapi#/..."` | | `description` | string | No | Description of this port. | ## Node kinds | Kind | Has inputs | Has outputs | Description | | ------------- | ---------- | -------------- | ------------------------------------------------------------- | | `source` | No | Yes | Entry point. No incoming edges. Triggered by external events. | | `transform` | Yes | Yes | Maps input to output. Pure function, no side effects. | | `router` | Yes | Yes (multiple) | Routes input to one of several named outputs. | | `llm` | Yes | Yes | Calls a language model. | | `action` | Yes | Yes (optional) | Performs side effects in external systems. | | `checkpoint` | Yes | Yes | Saves state to MongoDB and passes through. | | `publish` | Yes | No | Publishes to an event bus. Terminal-like. | | `terminal` | Yes | No | End of path. No outputs. | | `integration` | Yes | Yes | Bridges to an external FlowDSL flow. | ## `x-ui` fields | Field | Type | Description | | ------------ | ------ | ------------------------------------ | | `position.x` | number | Canvas X coordinate | | `position.y` | number | Canvas Y coordinate | | `color` | string | Hex color for the node card | | `icon` | string | Icon name from Studio's icon library | ## Complete example ```yaml nodes: LlmAnalyzeEmail: operationId: llm_analyze_email kind: llm summary: Classifies email as urgent, normal, or spam description: | Reads the email subject and body and uses an LLM to classify the email into one of three categories. Returns a classification with confidence score and reasoning. inputs: in: packet: EmailPayload description: The email to classify outputs: out: packet: AnalysisResult description: Classification result with confidence and reason settings: model: gpt-4o-mini temperature: 0.1 maxTokens: 500 systemPrompt: | Classify the email as urgent, normal, or spam. Return JSON: {"classification": "...", "confidence": 0.0-1.0, "reason": "..."} x-ui: position: x: 420 y: 200 color: "#7c3aed" icon: sparkles ``` ## Naming rules | Rule | Correct | Incorrect | | ---------------------------- | ---------------------------------- | ------------------------------------------ | | Node names: PascalCase | `OrderReceived`, `ValidatePayment` | `order_received`, `validatePayment` | | `operationId`: snake\_case | `validate_payment_amount` | `validatePaymentAmount`, `ValidatePayment` | | `operationId` must be unique | — | Same `operationId` in two nodes | ## Next steps - [Edge reference](https://flowdsl.com/docs/reference/spec/edge) — connecting nodes - [DeliveryPolicy reference](https://flowdsl.com/docs/reference/spec/delivery-policy) — delivery configuration - [Nodes concept](https://flowdsl.com/docs/concepts/nodes) — conceptual explanation # Packets Reference A packet is a typed schema for data flowing along an edge. Packets use JSON Schema Draft-07 and can be defined natively or referenced from an AsyncAPI document. ## Native packet definition ```yaml components: packets: EmailPayload: type: object title: Email Payload description: An incoming email from the support inbox properties: messageId: type: string description: Unique email identifier (e.g., IMAP UID) from: type: string format: email to: type: string format: email subject: type: string maxLength: 500 body: type: string receivedAt: type: string format: date-time headers: type: object additionalProperties: true required: [messageId, from, subject, body, receivedAt] additionalProperties: false ``` ## Supported JSON Schema Draft-07 keywords | Keyword | Supported | Notes | | --------------------------- | --------- | ------------------------------------------------------------------- | | `type` | Yes | `string`, `number`, `integer`, `boolean`, `object`, `array`, `null` | | `properties` | Yes | Object field definitions | | `required` | Yes | Array of required field names | | `enum` | Yes | Allowed values | | `format` | Yes | `email`, `date-time`, `uri`, `uuid` | | `minimum` / `maximum` | Yes | Number bounds | | `minLength` / `maxLength` | Yes | String length bounds | | `pattern` | Yes | Regex pattern | | `items` | Yes | Array item schema | | `additionalProperties` | Yes | `true`, `false`, or schema | | `$ref` | Yes | References within `components.packets` | | `oneOf` / `anyOf` / `allOf` | Yes | Schema composition | ## `$ref` within components Reference other packets within the same document: ```yaml components: packets: Address: type: object properties: street: { type: string } city: { type: string } country: { type: string, minLength: 2, maxLength: 2 } required: [street, city, country] Order: type: object properties: orderId: { type: string } shippingAddress: $ref: "#/components/packets/Address" billingAddress: $ref: "#/components/packets/Address" ``` ## AsyncAPI packet reference Reference a message schema from a linked AsyncAPI document: ```yaml asyncapi: "./events.asyncapi.yaml" edges: - from: OrderReceived to: ValidateOrder delivery: mode: durable packet: "asyncapi#/components/messages/OrderPlaced" ``` The `asyncapi#/...` syntax is a JSON Pointer path into the AsyncAPI document. The runtime resolves it to the message's `payload` schema. ## Packet reference on a node port ```yaml nodes: ValidateOrder: inputs: in: packet: OrderPayload # Native packet reference outputs: out: packet: "asyncapi#/components/messages/OrderValidated" # AsyncAPI reference ``` ## Naming convention | Element | Convention | | -------------- | ------------ | | Packet names | `PascalCase` | | Property names | `camelCase` | ## Runtime validation The runtime validates packets: 1. **At startup:** Verifies all referenced packet names exist. 2. **At runtime:** Validates each packet against its JSON Schema before delivery. Invalid packets are rejected and moved to the dead letter queue with a `VALIDATION` error code. ## Next steps - [Components reference](https://flowdsl.com/docs/reference/spec/components) — the components section - [DeliveryPolicy reference](https://flowdsl.com/docs/reference/spec/delivery-policy) — how packets are referenced on edges - [Packets concept](https://flowdsl.com/docs/concepts/packets) — conceptual explanation # RetryPolicy Reference A retry policy is nested inside a delivery policy and configures what the runtime does when a node handler returns a retriable error. ## Fields | Field | Type | Required | Default | Description | | -------------- | ----------------- | -------- | ---------- | ----------------------------------------------------------- | | `maxAttempts` | integer (1–10) | Yes | — | Total delivery attempts including the first. | | `backoff` | string | Yes | — | Backoff strategy: `"fixed"`, `"linear"`, or `"exponential"` | | `initialDelay` | ISO 8601 duration | Yes | — | Delay before the first retry. | | `maxDelay` | ISO 8601 duration | No | No limit | Maximum delay between retries. | | `jitter` | boolean | No | `false` | Add ±20% random variance to prevent retry storms. | | `retryOn` | array of string | No | All errors | Error codes to retry on. | ## `retryOn` error codes | Code | Meaning | | --------------- | -------------------------------------------- | | `TIMEOUT` | Node handler timed out | | `RATE_LIMITED` | External API returned rate limit error | | `TEMPORARY` | Transient failure (e.g., connection refused) | | `NETWORK_ERROR` | Network connectivity failure | Non-listed errors (`VALIDATION`, `PERMANENT`) always go to dead letter without retry, regardless of `retryOn`. ## Backoff strategies ### Fixed ```yaml retryPolicy: maxAttempts: 3 backoff: fixed initialDelay: PT5S ``` Retry timing: `wait 5s → wait 5s → dead letter` ### Linear ```yaml retryPolicy: maxAttempts: 4 backoff: linear initialDelay: PT2S maxDelay: PT10S ``` Retry timing: `wait 2s → wait 4s → wait 6s (capped 10s) → dead letter` ### Exponential ```yaml retryPolicy: maxAttempts: 5 backoff: exponential initialDelay: PT1S maxDelay: PT60S jitter: true retryOn: [RATE_LIMITED, TIMEOUT, TEMPORARY] ``` Retry timing (with jitter): `wait ~1s → wait ~2s → wait ~4s → wait ~8s → dead letter` ## Complete LLM retry policy example ```yaml edges: - from: PreparePrompt to: LlmSummarize delivery: mode: durable packet: PromptPayload idempotencyKey: "{{payload.documentId}}-summarize" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S maxDelay: PT120S jitter: true retryOn: [RATE_LIMITED, TIMEOUT] ``` ## Complete payment retry policy example ```yaml edges: - from: ValidateOrder to: ChargePayment delivery: mode: durable packet: ValidatedOrder idempotencyKey: "{{payload.orderId}}-charge" retryPolicy: maxAttempts: 3 backoff: fixed initialDelay: PT5S retryOn: [TIMEOUT, NETWORK_ERROR] ``` Fixed retry for payments: predictable, no exponential explosion, stops quickly so the customer doesn't wait too long. ## Next steps - [DeliveryPolicy reference](https://flowdsl.com/docs/reference/spec/delivery-policy) — the parent object - [Error Handling guide](https://flowdsl.com/docs/guides/error-handling) — dead letters and recovery - [Retry Policies concept](https://flowdsl.com/docs/concepts/retry-policies) — conceptual explanation # Runtime Bindings Reference Runtime bindings allow you to configure delivery mode infrastructure at the edge level — overriding the runtime's defaults for a specific edge. Use them when you need non-default MongoDB collections, Redis streams, or Kafka topics. ## `x-runtime` extension ```yaml edges: - from: ProcessOrder to: NotifyFulfillment delivery: mode: durable packet: OrderProcessed x-runtime: mongodb: collection: orders.fulfillment_queue # Override default collection name writeConcern: majority # "majority" or "1" readPreference: primaryPreferred - from: IngestEvent to: ProcessEvent delivery: mode: ephemeral stream: high-priority-events x-runtime: redis: keyPrefix: "flowdsl:hp:" # Custom key prefix maxLen: 1000000 trimStrategy: maxlen # "maxlen" or "minid" - from: PublishResult to: ExternalConsumers delivery: mode: stream topic: results.processed x-runtime: kafka: acks: all # "0", "1", or "all" compression: lz4 # "none", "gzip", "snappy", "lz4", "zstd" batchSize: 16384 lingerMs: 5 ``` ## MongoDB binding fields | Field | Type | Default | Description | | ---------------- | ------ | ------------------ | ------------------------------------------------ | | `collection` | string | `{flowId}.packets` | MongoDB collection name | | `writeConcern` | string | `"1"` | `"1"` or `"majority"` | | `readPreference` | string | `"primary"` | `"primary"`, `"primaryPreferred"`, `"secondary"` | ## Redis binding fields | Field | Type | Default | Description | | -------------- | ------- | ------------ | ----------------------- | | `keyPrefix` | string | `"flowdsl:"` | Prefix for Redis keys | | `maxLen` | integer | 100000 | Maximum stream length | | `trimStrategy` | string | `"maxlen"` | `"maxlen"` or `"minid"` | ## Kafka binding fields | Field | Type | Default | Description | | ------------- | ------- | -------- | ------------------------------------------------- | | `acks` | string | `"1"` | Producer acknowledgment: `"0"`, `"1"`, or `"all"` | | `compression` | string | `"none"` | Compression codec | | `batchSize` | integer | 16384 | Producer batch size in bytes | | `lingerMs` | integer | 0 | Producer linger time in milliseconds | ## When to use runtime bindings Most flows do not need `x-runtime` — the runtime's defaults are appropriate for the vast majority of use cases. Use `x-runtime` when: - You need to use a specific MongoDB collection for compliance or monitoring - You need Kafka `acks: all` for a specific edge that requires stronger durability - You are tuning Kafka producer settings for high-throughput edges - You need to share a Redis stream with non-FlowDSL consumers ## Next steps - [Extensions reference](https://flowdsl.com/docs/reference/spec/extensions) — all supported extension fields - [DeliveryPolicy reference](https://flowdsl.com/docs/reference/spec/delivery-policy) — delivery policy fields - [High-Throughput Pipelines](https://flowdsl.com/docs/guides/high-throughput-pipelines) — performance tuning # CLI Tools The FlowDSL CLI provides command-line tools for validating, formatting, converting, and deploying FlowDSL documents. ## Installation ```bash npm install -g @flowdsl/cli # or brew install flowdsl/tap/flowdsl ``` Verify: ```bash flowdsl --version # flowdsl/1.0.0 darwin-arm64 node-v20.0.0 ``` ## Commands ### `flowdsl validate` Validates a FlowDSL document against the JSON Schema and semantic rules. ```bash flowdsl validate my-flow.flowdsl.yaml flowdsl validate my-flow.flowdsl.json # Validate all flows in a directory flowdsl validate flows/ # Exit code: 0 = valid, 1 = invalid ``` **Output:** ```text ✓ my-flow.flowdsl.yaml is valid # or on error: ✗ my-flow.flowdsl.yaml has 2 errors Error 1: /nodes/OrderReceived/operationId operationId "receiveOrder" must be snake_case. Try "receive_order". Error 2: /edges/1/delivery/packet Packet "InvalidPacketName" is not defined in components.packets ``` ### `flowdsl convert` Convert between YAML and JSON formats. ```bash # YAML → JSON flowdsl convert my-flow.flowdsl.yaml --output my-flow.flowdsl.json # JSON → YAML flowdsl convert my-flow.flowdsl.json --output my-flow.flowdsl.yaml # Output to stdout flowdsl convert my-flow.flowdsl.yaml --format json ``` ### `flowdsl format` Format a FlowDSL YAML document with consistent indentation and field ordering. ```bash flowdsl format my-flow.flowdsl.yaml # Format in place flowdsl format my-flow.flowdsl.yaml --check # Check without modifying (CI use) ``` ### `flowdsl generate` Generate code or documentation from a FlowDSL document. ```bash # Generate Go node stubs flowdsl generate go --output ./nodes/ my-flow.flowdsl.yaml # Generate Python node stubs flowdsl generate python --output ./nodes/ my-flow.flowdsl.yaml # Generate markdown documentation flowdsl generate docs --output ./docs/ my-flow.flowdsl.yaml ``` ### `flowdsl auth` Authenticate with repo.flowdsl.com. ```bash flowdsl auth login flowdsl auth status flowdsl auth logout ``` ### `flowdsl publish` Publish a node to the registry (requires authentication). ```bash flowdsl publish --manifest flowdsl-node.json flowdsl publish --manifest flowdsl-node.json --tag beta ``` ## Configuration Create a `.flowdslrc` file in your project root: ```yaml schemaUrl: https://flowdsl.com/schemas/v1/flowdsl.schema.json registryUrl: https://repo.flowdsl.com validateOnSave: true format: indent: 2 sortKeys: true ``` ## CI integration ```yaml # GitHub Actions - name: Validate FlowDSL flows run: | npm install -g @flowdsl/cli flowdsl validate flows/ ``` ## Next steps - [Studio](https://flowdsl.com/docs/tools/studio) — visual editor for FlowDSL - [Go SDK](https://flowdsl.com/docs/tools/go-sdk) — implement nodes in Go # Go SDK Reference `github.com/flowdsl/flowdsl-go` is the official Go SDK for implementing FlowDSL node handlers and running the FlowDSL runtime. ## Installation ```bash go get github.com/flowdsl/flowdsl-go ``` ## Core interfaces ### `NodeHandler` Every node implements this interface: ```go type NodeHandler interface { OperationID() string Init(settings Settings) error Handle(ctx context.Context, input NodeInput) (NodeOutput, error) } ``` ### `NodeInput` ```go type NodeInput interface { Packet(portName string) (Packet, error) Context() ExecutionContext } ``` ### `Packet` ```go type Packet interface { GetString(key string) (string, bool) GetStringOr(key, defaultVal string) string GetInt(key string) (int64, bool) GetFloat(key string) (float64, bool) GetBool(key string) (bool, bool) GetMap(key string) (map[string]any, bool) Data() map[string]any Has(key string) bool } ``` ### `NodeOutput` ```go // Build output by chaining Send calls output := flowdsl.NodeOutput{}.Send("out", resultPacket) // Multiple outputs (for router nodes) output := flowdsl.NodeOutput{}. Send("urgent_out", urgentPacket). Send("normal_out", normalPacket) ``` ### `NodeServer` ```go server := flowdsl.NewNodeServer( flowdsl.WithPort(8080), flowdsl.WithLogger(logger), flowdsl.WithManifestFile("flowdsl-node.json"), flowdsl.WithTLSConfig(tlsConfig), ) server.Register(&MyNode{}) server.ListenAndServe() ``` ## Error types ```go // Return typed errors for proper runtime handling return flowdsl.NodeOutput{}, flowdsl.NewNodeError( flowdsl.ErrCodeRateLimited, // or: ErrCodeTimeout, ErrCodeTemporary, ErrCodeValidation, ErrCodePermanent "Rate limit exceeded", originalErr, ) ``` ## ExecutionContext ```go ctx := input.Context() ctx.FlowID // string: the flow ID ctx.ExecutionID // string: unique execution context ID ctx.NodeID // string: the current node name ctx.IdempotencyKey // string: the edge's idempotency key (empty if not set) ctx.TraceHeaders // map[string]string: distributed tracing headers ``` ## Settings ```go type Settings map[string]any func (s Settings) GetString(key string) (string, bool) func (s Settings) GetStringOr(key, defaultVal string) string func (s Settings) GetInt(key string) (int64, bool) func (s Settings) GetBool(key string) (bool, bool) func (s Settings) GetStringSlice(key string) ([]string, bool) ``` ## Testing ```go import "github.com/flowdsl/flowdsl-go/testing" func TestMyNode(t *testing.T) { node := &MyNode{} err := node.Init(flowdsl.Settings{"key": "value"}) require.NoError(t, err) input := flowdsltesting.NewMockInput("in", map[string]any{ "field": "value", }) output, err := node.Handle(context.Background(), input) require.NoError(t, err) result := output.Packet("out") assert.Equal(t, "expected", result.GetStringOr("field", "")) } ``` ## Complete node example ```go package main import ( "context" flowdsl "github.com/flowdsl/flowdsl-go" "log/slog" "os" ) type FilterNode struct { urgentPriorities map[string]bool } func (n *FilterNode) OperationID() string { return "filter_by_priority" } func (n *FilterNode) Init(settings flowdsl.Settings) error { list, _ := settings.GetStringSlice("urgentPriorities") if len(list) == 0 { list = []string{"P0", "P1"} } n.urgentPriorities = make(map[string]bool) for _, p := range list { n.urgentPriorities[p] = true } return nil } func (n *FilterNode) Handle(ctx context.Context, input flowdsl.NodeInput) (flowdsl.NodeOutput, error) { payload, err := input.Packet("in") if err != nil { return flowdsl.NodeOutput{}, err } priority := payload.GetStringOr("priority", "P2") if n.urgentPriorities[priority] { return flowdsl.NodeOutput{}.Send("urgent_out", payload), nil } return flowdsl.NodeOutput{}.Send("normal_out", payload), nil } func main() { server := flowdsl.NewNodeServer(flowdsl.WithPort(8080)) server.Register(&FilterNode{}) slog.Info("starting", "port", 8080) server.ListenAndServe() } ``` ## Next steps - [Write a Go Node tutorial](https://flowdsl.com/docs/tutorials/writing-a-go-node) — full step-by-step guide - [Python SDK](https://flowdsl.com/docs/tools/python-sdk) — Python equivalent # Tools FlowDSL provides official tooling for authoring, validating, and implementing flows. ## Pages in this section ### [Studio](https://flowdsl.com/docs/tools/studio) The FlowDSL visual editor. Browse, create, edit, and validate flows on a canvas. Available at [flowdsl.com/studio](https://flowdsl.com/studio){rel=""nofollow""} and locally via Docker. ### [CLI](https://flowdsl.com/docs/tools/cli) Command-line tools for validating, formatting, and generating FlowDSL documents. Install via `npm install -g @flowdsl/cli`. ### [Go SDK](https://flowdsl.com/docs/tools/go-sdk) `github.com/flowdsl/flowdsl-go` — the official Go SDK for implementing FlowDSL node handlers and running the runtime. ### [Python SDK](https://flowdsl.com/docs/tools/python-sdk) `flowdsl-py` — the official Python SDK for implementing FlowDSL nodes in async Python with FastAPI/Pydantic integration. ### [JavaScript SDK](https://flowdsl.com/docs/tools/js-sdk) `@flowdsl/sdk` — the official TypeScript/JavaScript SDK for implementing FlowDSL nodes in Node.js. # JavaScript SDK Reference `@flowdsl/sdk` is the official TypeScript/JavaScript SDK for implementing FlowDSL node handlers in Node.js. ## Installation ```bash npm install @flowdsl/sdk # or yarn add @flowdsl/sdk ``` ## Core types ```typescript import { BaseNode, NodeInput, NodeOutput, NodeError, ErrorCode, NodeServer, } from '@flowdsl/sdk' ``` ### `BaseNode` ```typescript abstract class BaseNode { abstract operationId: string async init(settings: Record): Promise {} abstract handle(input: NodeInput): Promise } ``` ### `NodeInput` ```typescript interface NodeInput { packet(portName: string): Promise context: ExecutionContext } ``` ### `Packet` ```typescript interface Packet { data: Record get(key: string, defaultValue?: unknown): unknown getString(key: string, defaultValue?: string): string getNumber(key: string, defaultValue?: number): number getBoolean(key: string, defaultValue?: boolean): boolean has(key: string): boolean } ``` ### `NodeOutput` ```typescript class NodeOutput { send(portName: string, data: Record | Packet): NodeOutput } ``` ## Complete example ```typescript import { BaseNode, NodeInput, NodeOutput, NodeError, ErrorCode, NodeServer } from '@flowdsl/sdk' class FilterNode extends BaseNode { operationId = 'filter_by_priority' private urgentPriorities = new Set(['P0', 'P1']) async init(settings: Record): Promise { const priorities = settings.urgentPriorities as string[] | undefined if (priorities) { this.urgentPriorities = new Set(priorities) } } async handle(input: NodeInput): Promise { const payload = await input.packet('in') const priority = payload.getString('priority', 'P2') if (this.urgentPriorities.has(priority)) { return new NodeOutput().send('urgent_out', payload.data) } return new NodeOutput().send('normal_out', payload.data) } } const server = new NodeServer({ port: 8080, manifestFile: 'flowdsl-node.json', }) server.register(new FilterNode()) server.listen() ``` ## Next steps - [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) — same pattern in Go - [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — same pattern in Python # Python SDK Reference `flowdsl-py` is the official Python SDK for implementing FlowDSL node handlers. It is built on asyncio and integrates with FastAPI and Pydantic v2. ## Installation ```bash pip install flowdsl-py ``` ## Core classes ### `BaseNode` ```python from flowdsl import BaseNode, NodeInput, NodeOutput class MyNode(BaseNode): operation_id = "my_operation" # class variable, matches operationId in flow async def init(self, settings: dict) -> None: """Called once at startup with static settings from the flow document.""" self.config = settings.get("key", "default") async def handle(self, input: NodeInput) -> NodeOutput: """Called once per incoming packet.""" payload = await input.packet("in") result = {"processed": True, "value": payload.get("value")} return NodeOutput().send("out", result) ``` ### `NodeInput` ```python class NodeInput: async def packet(self, port_name: str) -> Packet context: ExecutionContext ``` ### `Packet` ```python class Packet: data: dict[str, Any] # Raw underlying dict def get(self, key: str, default=None) -> Any def get_str(self, key: str, default: str = "") -> str def get_int(self, key: str, default: int = 0) -> int def get_float(self, key: str, default: float = 0.0) -> float def get_bool(self, key: str, default: bool = False) -> bool def get_list(self, key: str, default: list = None) -> list def has(self, key: str) -> bool ``` ### `NodeOutput` ```python class NodeOutput: def send(self, port_name: str, data: dict | Packet) -> NodeOutput # Method chaining for multiple outputs: # NodeOutput().send("urgent", urgent_data).send("normal", normal_data) ``` ### `NodeError` ```python from flowdsl import NodeError, ErrorCode raise NodeError(ErrorCode.RATE_LIMITED, "API rate limit", original=original_exc) raise NodeError(ErrorCode.TIMEOUT, "Request timed out") raise NodeError(ErrorCode.VALIDATION, "Missing required field: orderId") raise NodeError(ErrorCode.TEMPORARY, "Transient service error") raise NodeError(ErrorCode.PERMANENT, "Unsupported region — will never succeed") ``` ### `ExecutionContext` ```python class ExecutionContext: flow_id: str execution_id: str node_id: str idempotency_key: str | None trace_headers: dict[str, str] ``` ### `NodeServer` ```python from flowdsl import NodeServer server = NodeServer( port=8082, manifest_file="flowdsl-node.json", logger=logging.getLogger("my-node"), ) server.register(MyNode()) await server.serve() ``` ## Testing utilities ```python from flowdsl.testing import MockNodeInput input_ = MockNodeInput({"in": {"field": "value"}}) output = await node.handle(input_) assert output.packets["out"]["processed"] == True ``` ## Complete example ```python import asyncio from flowdsl import BaseNode, NodeInput, NodeOutput, NodeError, ErrorCode, NodeServer class UppercaseNode(BaseNode): operation_id = "uppercase_text" async def init(self, settings: dict) -> None: self.max_length = settings.get("maxLength", 1000) async def handle(self, input: NodeInput) -> NodeOutput: payload = await input.packet("in") text = payload.get_str("text") if not text: raise NodeError(ErrorCode.VALIDATION, "text field is required") if len(text) > self.max_length: raise NodeError(ErrorCode.VALIDATION, f"text exceeds maxLength ({self.max_length})") return NodeOutput().send("out", {"text": text.upper(), "originalLength": len(text)}) async def main(): server = NodeServer(port=8082, manifest_file="flowdsl-node.json") server.register(UppercaseNode()) await server.serve() if __name__ == "__main__": asyncio.run(main()) ``` ## Next steps - [Write a Python Node tutorial](https://flowdsl.com/docs/tutorials/writing-a-python-node) — full step-by-step guide - [LLM Flows guide](https://flowdsl.com/docs/guides/llm-flows) — Python LLM node patterns # FlowDSL Studio FlowDSL Studio is the official open-source visual editor for FlowDSL flows. It is built with React and React Flow and available at [flowdsl.com/studio](https://flowdsl.com/studio){rel=""nofollow""} or self-hosted locally. ## Running Studio **Cloud (no setup):** Navigate to {rel=""nofollow""} **Local (with full runtime):** ```bash git clone https://github.com/flowdsl/examples cd examples && make up-infra # Studio available at http://localhost:5173 ``` **Docker only:** ```bash docker run -p 5173:5173 flowdsl/studio:latest ``` ## Features | Feature | Description | | ----------------------- | ------------------------------------------------------------------ | | Visual canvas | Interactive node graph editor using React Flow | | NodeContractCard | Bilateral contract view showing inputs and outputs per node | | Delivery mode badges | Color-coded edges showing active delivery mode | | Schema validation | Real-time validation against the FlowDSL JSON Schema | | YAML/JSON import/export | Load and save `.flowdsl.yaml` and `.flowdsl.json` files | | Example flows | Built-in examples: Order Fulfillment, Email Triage, Sales Pipeline | | Execution monitor | Live view of flow execution (requires local runtime) | | Dead letter inspector | Browse and re-inject failed packets | | Node palette | Drag-and-drop node creation by kind | | Edge editor | Right-click edges to set delivery policies | ## Keyboard shortcuts | Shortcut | Action | | ------------------- | --------------------- | | `Ctrl+S` | Save YAML to disk | | `Ctrl+Shift+V` | Validate | | `Ctrl+E` | Export to JSON | | `Ctrl+Z` / `Ctrl+Y` | Undo / Redo | | `Space + drag` | Pan canvas | | `Ctrl+scroll` | Zoom | | `Ctrl+Shift+F` | Fit all nodes in view | | `Delete` | Delete selected | ## Source Studio is open source at [github.com/flowdsl/studio](https://github.com/flowdsl/studio){rel=""nofollow""}. Built with React 18, TypeScript, React Flow, and Zustand. ## Next steps - [Using the Studio tutorial](https://flowdsl.com/docs/tutorials/using-the-studio) — step-by-step walkthrough - [Getting Started](https://flowdsl.com/docs/tutorials/getting-started) — run your first flow in Studio # Reference AsyncAPI Messages in FlowDSL FlowDSL is self-contained, but if your team already has AsyncAPI documents describing your event contracts, you can reference those schemas directly in FlowDSL rather than duplicating them. This tutorial shows how. ## Why reference AsyncAPI? If your team maintains an AsyncAPI document for your event bus, it is the authoritative schema for your events. Duplicating those schemas in FlowDSL creates drift — two definitions that can fall out of sync. Referencing them directly means: - One source of truth - FlowDSL validates packets against the actual AsyncAPI-defined schema - Changes to the AsyncAPI schema automatically apply to the FlowDSL flow - Studio can show the resolved schema in the NodeContractCard ## Your AsyncAPI document ```yaml # events.asyncapi.yaml asyncapi: "2.6.0" info: title: Support Events version: "1.0.0" channels: support/email-received: subscribe: operationId: email_received message: $ref: "#/components/messages/EmailReceived" components: messages: EmailReceived: name: EmailReceived payload: type: object properties: messageId: type: string from: type: string format: email subject: type: string body: type: string receivedAt: type: string format: date-time required: [messageId, from, subject, body, receivedAt] TicketCreated: name: TicketCreated payload: type: object properties: ticketId: type: string emailMessageId: type: string priority: type: string enum: [urgent, normal, low] status: type: string enum: [open, pending, resolved] required: [ticketId, emailMessageId, priority, status] ``` ## Reference it in FlowDSL ```yaml flowdsl: "1.0" info: title: Email Triage version: "1.0.0" # Point to the AsyncAPI document asyncapi: "./events.asyncapi.yaml" externalDocs: url: ./events.asyncapi.yaml description: AsyncAPI event contracts for the support system nodes: EmailReceiver: operationId: receive_email kind: source outputs: out: # Reference the AsyncAPI message directly packet: "asyncapi#/components/messages/EmailReceived" ClassifyEmail: operationId: classify_email kind: llm inputs: in: packet: "asyncapi#/components/messages/EmailReceived" outputs: out: # Native packet for the internal analysis result packet: AnalysisResult CreateTicket: operationId: create_ticket kind: action inputs: in: packet: AnalysisResult outputs: out: packet: "asyncapi#/components/messages/TicketCreated" edges: - from: EmailReceiver to: ClassifyEmail delivery: mode: durable packet: "asyncapi#/components/messages/EmailReceived" - from: ClassifyEmail to: CreateTicket delivery: mode: durable packet: AnalysisResult components: packets: # Native packet for data that doesn't exist in AsyncAPI AnalysisResult: type: object properties: email: type: object classification: type: string enum: [urgent, normal, spam] confidence: type: number required: [email, classification, confidence] ``` ## The reference syntax | Syntax | Resolves to | | --------------------------------------------- | --------------------------------------------------------------------------------- | | `asyncapi#/components/messages/EmailReceived` | The payload schema of the `EmailReceived` message in the linked AsyncAPI doc | | `MyPacket` | A packet defined in `components.packets.MyPacket` in the current FlowDSL document | The `#` is a JSON Pointer fragment. The path after `#` is resolved within the AsyncAPI document. ## How the runtime resolves references 1. At startup, the runtime reads the `asyncapi` field to locate the AsyncAPI document. 2. For each edge with an `asyncapi#/...` packet reference, the runtime resolves the JSON Pointer in the loaded AsyncAPI document. 3. The resolved JSON Schema is used for packet validation at runtime. 4. If the AsyncAPI document is unavailable, the runtime fails to start. ## Mixed native and AsyncAPI packets You can freely mix native packets and AsyncAPI references in the same flow: ```yaml components: packets: # This packet doesn't exist in AsyncAPI — define it natively InternalAnalysis: type: object properties: urgencyScore: { type: number } categories: { type: array, items: { type: string } } edges: - from: EmailReceiver to: Analyzer delivery: packet: "asyncapi#/components/messages/EmailReceived" # from AsyncAPI - from: Analyzer to: Router delivery: packet: InternalAnalysis # native packet ``` ## Validation Both documents validate independently: ```bash # Validate the AsyncAPI document asyncapi validate events.asyncapi.yaml # Validate the FlowDSL document flowdsl validate email-triage.flowdsl.yaml ``` The FlowDSL validator also resolves and validates all `asyncapi#/...` references — it will fail if the referenced path doesn't exist in the AsyncAPI document. ## What happens when AsyncAPI schemas change **Non-breaking changes** (adding optional fields): FlowDSL continues to work. Existing packets pass validation. Node handlers that don't read the new field are unaffected. **Breaking changes** (removing required fields, renaming fields): Packet validation at the edge will fail for packets that no longer conform to the updated schema. The runtime will reject those packets and move them to the dead letter queue. ::callout{type="warning"} Always version your AsyncAPI documents. Use `events.asyncapi.v2.yaml` instead of overwriting `events.asyncapi.yaml` when making breaking changes. Reference the specific version in your FlowDSL document. :: ## Summary - Set `asyncapi: "./path/to/asyncapi.yaml"` in the FlowDSL document to link the AsyncAPI file. - Reference messages with `asyncapi#/components/messages/MessageName` on any packet field. - Mix native `components.packets` with AsyncAPI references freely. - Both documents validate independently; FlowDSL also validates the reference paths. ## Next steps - [AsyncAPI Integration guide](https://flowdsl.com/docs/guides/asyncapi-integration) — full integration guide with schema evolution - [Packets concept](https://flowdsl.com/docs/concepts/packets) — native packet definitions - [Email Triage Flow](https://flowdsl.com/docs/tutorials/email-triage-flow) — complete email flow using AsyncAPI references # Run FlowDSL Locally with Docker Compose This tutorial walks through starting the complete FlowDSL infrastructure stack locally — MongoDB, Redis, Kafka, Studio, and the runtime — using Docker Compose. ## Prerequisites - **Docker Desktop 4.x+** with Docker Compose v2: `docker compose version` - 4GB+ RAM available for Docker - Ports 5173, 6379, 8081, 8082, 9092, 27017, 50051-50053 must be available ## Step 1: Clone the examples repository ```bash git clone https://github.com/flowdsl/examples cd examples ``` ## Step 2: Review the docker-compose.yaml ```yaml # docker-compose.yaml (excerpt) services: mongodb: image: mongo:7 ports: - "27017:27017" volumes: - mongodb_data:/data/db healthcheck: test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] interval: 10s timeout: 5s retries: 5 redis: image: redis:7-alpine ports: - "6379:6379" command: redis-server --save 60 1 --loglevel warning healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 3s retries: 10 zookeeper: image: confluentinc/cp-zookeeper:7.5.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:7.5.0 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" healthcheck: test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"] interval: 30s timeout: 10s retries: 5 kafka-ui: image: provectuslabs/kafka-ui:latest ports: - "8082:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 depends_on: - kafka flowdsl-runtime: image: flowdsl/runtime:latest ports: - "8081:8081" environment: MONGODB_URI: mongodb://mongodb:27017/flowdsl REDIS_URL: redis://redis:6379 KAFKA_BROKERS: kafka:9092 FLOWDSL_REGISTRY_FILE: /app/node-registry.yaml FLOWDSL_NODE_TRANSPORT: grpc FLOWDSL_GRPC_GO_ADDR: nodes-go:50051 FLOWDSL_GRPC_PYTHON_ADDR: nodes-py:50052 FLOWDSL_GRPC_JS_ADDR: nodes-js:50053 volumes: - ./:/app/flows - ./node-registry.yaml:/app/node-registry.yaml depends_on: mongodb: condition: service_healthy redis: condition: service_healthy flowdsl-studio: image: flowdsl/studio:latest ports: - "5173:5173" environment: VITE_RUNTIME_URL: http://localhost:8081 depends_on: - flowdsl-runtime volumes: mongodb_data: ``` ## Step 3: Start everything ```bash make up-infra ``` Or directly: ```bash docker compose up -d ``` This pulls images on the first run (may take a few minutes). ## Step 4: Verify services ```bash docker compose ps ``` All services should show `healthy` or `running`: ```text NAME STATUS PORTS examples-mongodb-1 Up (healthy) 0.0.0.0:27017->27017/tcp examples-redis-1 Up (healthy) 0.0.0.0:6379->6379/tcp examples-zookeeper-1 Up 0.0.0.0:2181->2181/tcp examples-kafka-1 Up (healthy) 0.0.0.0:9092->9092/tcp examples-kafka-ui-1 Up 0.0.0.0:8082->8080/tcp examples-flowdsl-runtime-1 Up (healthy) 0.0.0.0:8081->8081/tcp examples-flowdsl-studio-1 Up 0.0.0.0:5173->5173/tcp ``` ## Step 5: Access the services | Service | URL | What it is | | ------------------- | --------------------------------------------------- | ------------------------------ | | Studio | {rel=""nofollow""} | FlowDSL visual editor | | Runtime API | {rel=""nofollow""} | Flow management API | | Kafka UI | {rel=""nofollow""} | Browse Kafka topics | | MongoDB | localhost:27017 | Connect with MongoDB Compass | | Redis | localhost:6379 | Connect with Redis Insight | | Go nodes (gRPC) | localhost:50051 | gRPC endpoint for Go nodes | | Python nodes (gRPC) | localhost:50052 | gRPC endpoint for Python nodes | | JS nodes (gRPC) | localhost:50053 | gRPC endpoint for JS nodes | ## Step 6: Load and run a sample flow Copy an example flow to the working directory: ```bash cp order-fulfillment/order-fulfillment.flowdsl.yaml . ``` Deploy it via the runtime API: ```bash curl -X POST http://localhost:8081/flows \ -H "Content-Type: application/yaml" \ --data-binary @order-fulfillment.flowdsl.yaml ``` Or drag it into Studio at {rel=""nofollow""}. Trigger the flow with a sample event: ```bash curl -X POST http://localhost:8081/flows/order_fulfillment/trigger \ -H "Content-Type: application/json" \ -d '{ "orderId": "ord-001", "customerId": "cust-123", "items": [{"sku": "WIDGET-A", "qty": 2, "price": 19.99}], "total": 39.98, "currency": "USD" }' ``` ## Step 7: View execution logs ```bash # Stream runtime logs docker compose logs -f flowdsl-runtime # View logs for all services docker compose logs -f ``` Execution events look like: ```text flowdsl-runtime | {"level":"INFO","executionId":"exec-001","flowId":"order_fulfillment","nodeId":"ValidateOrder","status":"completed","durationMs":2} flowdsl-runtime | {"level":"INFO","executionId":"exec-001","flowId":"order_fulfillment","nodeId":"ChargePayment","status":"completed","durationMs":847} ``` ## Step 8: Stop everything ```bash make down # or docker compose down ``` To also remove persisted data (MongoDB volumes): ```bash docker compose down -v ``` ## Troubleshooting **Port already in use:** ```bash # Find what's using port 27017 lsof -i :27017 # Change the host port in docker-compose.yaml if needed ``` **Kafka not starting:** Kafka requires at least 2GB RAM. Check Docker Desktop's memory limit in Preferences → Resources. Set to at least 4GB. **Runtime can't connect to MongoDB:** Wait for MongoDB to show `healthy` before the runtime starts. You can restart just the runtime: ```bash docker compose restart flowdsl-runtime ``` **Studio shows "Runtime offline":** Ensure the runtime is healthy before opening Studio. The runtime needs MongoDB and Redis to be healthy first. ## Summary ```text make up-infra # Start all services docker compose ps # Check service health # Open Studio: http://localhost:5173 # Runtime API: http://localhost:8081 docker compose logs -f flowdsl-runtime # Stream logs make down # Stop all services ``` ## Next steps - [Getting Started](https://flowdsl.com/docs/tutorials/getting-started) — load the Order Fulfillment example - [Your First Flow](https://flowdsl.com/docs/tutorials/your-first-flow) — build and run a custom flow - [Go SDK](https://flowdsl.com/docs/tools/go-sdk) — run your own node implementations # Build an Email Triage Workflow This tutorial builds a production-ready email triage workflow using FlowDSL. An LLM classifies incoming emails as urgent, normal, or spam, and routes them to the appropriate handler — SMS alert, support ticket, or spam archive. ## What you'll build ```mermaid flowchart TD A[EmailFetcher\nsource] -->|durable| B[LlmAnalyzer\nllm] B -->|durable| C{LlmRouter\nrouter} C -->|urgent| D[UrgentSmsAlert\naction] C -->|normal| E[CreateSupportTicket\naction] C -->|spam| F[SpamArchiver\nterminal] ``` ## 1. Design: what problem this solves Your support inbox receives hundreds of emails per day. The team's on-call engineer needs to be paged immediately for genuinely urgent issues (production outages, security incidents) but must not be paged for routine requests. Spam must be auto-archived. Everything else becomes a support ticket. Manual triage is slow and inconsistent. This flow automates it with an LLM classifier that reads each email and routes it in under a second. ## 2. Flow document skeleton ```yaml flowdsl: "1.0" info: title: Email Triage version: "1.0.0" description: | Classifies incoming support emails as urgent, normal, or spam using an LLM and routes them to the appropriate handler. nodes: {} edges: [] components: packets: {} ``` ## 3. Add the EmailFetcher node The `EmailFetcher` polls an IMAP inbox or listens on a webhook from the email provider: ```yaml nodes: EmailFetcher: operationId: fetch_email kind: source summary: Fetches new emails from the support inbox outputs: out: packet: EmailPayload settings: imapHost: imap.support.mycompany.com imapPort: 993 pollIntervalSeconds: 30 ``` ## 4. Add the LlmAnalyzer node The LLM reads the email subject and body and returns a classification with confidence score: ```yaml nodes: LlmAnalyzer: operationId: llm_analyze_email kind: llm summary: Classifies email priority using an LLM inputs: in: packet: EmailPayload outputs: out: packet: AnalysisResult settings: model: gpt-4o-mini temperature: 0.1 systemPrompt: | You are an expert support email classifier. Classify the email as exactly one of: urgent, normal, or spam. Urgent: production outages, security incidents, data loss, legal issues. Normal: feature requests, bug reports, billing questions, general support. Spam: promotional emails, irrelevant content, automated notifications. Respond with JSON: {"classification": "urgent|normal|spam", "confidence": 0.0-1.0, "reason": "..."} ``` Add the edge from `EmailFetcher` to `LlmAnalyzer`. This is the most critical edge in the flow — an LLM call costs money and must not be re-run unnecessarily: ```yaml edges: - from: EmailFetcher to: LlmAnalyzer delivery: mode: durable packet: EmailPayload idempotencyKey: "{{payload.messageId}}-analyze" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S maxDelay: PT60S retryOn: [RATE_LIMITED, TIMEOUT] ``` **Why `durable` with `idempotencyKey`?** If the process crashes between the LLM response and the acknowledgment, the runtime will retry. Without an idempotency key, the LLM call would be made again — wasting money and potentially producing different results. The `messageId` ensures each email is analyzed exactly once. ## 5. Add the LlmRouter node ```yaml nodes: LlmRouter: operationId: route_by_classification kind: router summary: Routes emails based on LLM classification inputs: in: packet: AnalysisResult outputs: urgent: packet: AnalysisResult description: Production issues, security incidents normal: packet: AnalysisResult description: Standard support requests spam: packet: AnalysisResult description: Promotional and irrelevant emails ``` ```yaml edges: - from: LlmAnalyzer to: LlmRouter delivery: mode: durable packet: AnalysisResult ``` ## 6. Add UrgentSmsAlert with retry and idempotency ```yaml nodes: UrgentSmsAlert: operationId: send_urgent_sms kind: action summary: Sends an SMS to the on-call engineer via Twilio inputs: in: packet: AnalysisResult settings: twilioFromNumber: "+15550100200" oncallNumber: "+15550100300" ``` ```yaml edges: - from: LlmRouter.urgent to: UrgentSmsAlert delivery: mode: durable packet: AnalysisResult idempotencyKey: "{{payload.email.messageId}}-sms" retryPolicy: maxAttempts: 4 backoff: exponential initialDelay: PT2S maxDelay: PT30S jitter: true ``` **Why `idempotencyKey` here?** SMS is an irreversible side effect — sending the same alert twice wakes someone up twice. The idempotency key ensures the SMS is sent exactly once even if the node retries. ## 7. Add CreateSupportTicket ```yaml nodes: CreateSupportTicket: operationId: create_support_ticket kind: action summary: Creates a support ticket in the ticketing system inputs: in: packet: AnalysisResult settings: ticketingSystem: zendesk defaultPriority: normal ``` ```yaml edges: - from: LlmRouter.normal to: CreateSupportTicket delivery: mode: durable packet: AnalysisResult idempotencyKey: "{{payload.email.messageId}}-ticket" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT3S ``` ## 8. Add SpamArchiver ```yaml nodes: SpamArchiver: operationId: archive_spam kind: terminal summary: Archives the email in the spam folder inputs: in: packet: AnalysisResult ``` ```yaml edges: - from: LlmRouter.spam to: SpamArchiver delivery: mode: direct packet: AnalysisResult ``` **Why `direct` for spam?** Archiving spam is fast and idempotent — writing to a spam folder twice is harmless. `direct` avoids unnecessary MongoDB overhead. ## 9. Complete final YAML ```yaml flowdsl: "1.0" info: title: Email Triage version: "1.0.0" description: | Classifies incoming support emails as urgent, normal, or spam using an LLM and routes them to the appropriate handler. nodes: EmailFetcher: operationId: fetch_email kind: source summary: Fetches new emails from the support inbox outputs: out: { packet: EmailPayload } settings: imapHost: imap.support.mycompany.com pollIntervalSeconds: 30 LlmAnalyzer: operationId: llm_analyze_email kind: llm summary: Classifies email priority using an LLM inputs: in: { packet: EmailPayload } outputs: out: { packet: AnalysisResult } settings: model: gpt-4o-mini temperature: 0.1 systemPrompt: | Classify the email as: urgent, normal, or spam. Respond with JSON: {"classification": "urgent|normal|spam", "confidence": 0.0-1.0, "reason": "..."} LlmRouter: operationId: route_by_classification kind: router inputs: in: { packet: AnalysisResult } outputs: urgent: { packet: AnalysisResult } normal: { packet: AnalysisResult } spam: { packet: AnalysisResult } UrgentSmsAlert: operationId: send_urgent_sms kind: action inputs: in: { packet: AnalysisResult } settings: twilioFromNumber: "+15550100200" oncallNumber: "+15550100300" CreateSupportTicket: operationId: create_support_ticket kind: action inputs: in: { packet: AnalysisResult } settings: ticketingSystem: zendesk SpamArchiver: operationId: archive_spam kind: terminal inputs: in: { packet: AnalysisResult } edges: - from: EmailFetcher to: LlmAnalyzer delivery: mode: durable packet: EmailPayload idempotencyKey: "{{payload.messageId}}-analyze" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S retryOn: [RATE_LIMITED, TIMEOUT] - from: LlmAnalyzer to: LlmRouter delivery: mode: durable packet: AnalysisResult - from: LlmRouter.urgent to: UrgentSmsAlert delivery: mode: durable packet: AnalysisResult idempotencyKey: "{{payload.email.messageId}}-sms" retryPolicy: maxAttempts: 4 backoff: exponential initialDelay: PT2S maxDelay: PT30S jitter: true - from: LlmRouter.normal to: CreateSupportTicket delivery: mode: durable packet: AnalysisResult idempotencyKey: "{{payload.email.messageId}}-ticket" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT3S - from: LlmRouter.spam to: SpamArchiver delivery: mode: direct packet: AnalysisResult components: packets: EmailPayload: type: object properties: messageId: { type: string } from: { type: string, format: email } to: { type: string, format: email } subject: { type: string } body: { type: string } receivedAt: { type: string, format: date-time } headers: { type: object, additionalProperties: true } required: [messageId, from, subject, body, receivedAt] AnalysisResult: type: object properties: email: $ref: "#/components/packets/EmailPayload" classification: type: string enum: [urgent, normal, spam] confidence: type: number minimum: 0 maximum: 1 reason: type: string analyzedAt: type: string format: date-time required: [email, classification, confidence, analyzedAt] ``` ## 10. Load in Studio and monitor Drag the YAML into Studio. You'll see the six nodes and five edges with color-coded delivery mode badges on each edge. Click **Run Sample** and enter a test email payload: ```json { "messageId": "msg-001", "from": "user@example.com", "subject": "Production database is down", "body": "Our primary database is returning connection refused errors. All services are affected.", "receivedAt": "2026-03-28T10:00:00Z" } ``` Watch the execution monitor show each node firing in sequence, the LLM classification returning `urgent`, and the SMS alert being enqueued. ## Summary | Pattern | Where used | Why | | ------------------------------- | ------------------------------- | ------------------------------------------------ | | `durable` + `idempotencyKey` | LLM edge, SMS edge, ticket edge | Expensive side effects that must not duplicate | | Exponential backoff with jitter | SMS and ticket edges | Avoid retry storms against rate-limited services | | `direct` for spam | Spam archival | Fast, idempotent, no durability needed | | `router` node | LlmRouter | Code-driven multi-way routing | ## Next steps - [LLM Flows](https://flowdsl.com/docs/guides/llm-flows) — deep dive into LLM orchestration patterns - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — how to write safe idempotent node handlers - [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — implement the `LlmAnalyzer` in Python # Getting Started with FlowDSL This tutorial gets you from zero to a running flow in five minutes. You will clone the example repository, start the infrastructure, open FlowDSL Studio, and explore a real order fulfillment flow. ## What you'll learn - How to start the FlowDSL infrastructure stack with Docker Compose - How to open and navigate FlowDSL Studio - How to validate a FlowDSL document - What the key parts of a flow document mean ## Prerequisites - **Docker Desktop** — [download here](https://www.docker.com/products/docker-desktop){rel=""nofollow""} - **Git** - **Node.js 20+** (for Studio development mode, optional) - **Go 1.21+** (optional, for running node implementations) ## Step 1: Clone the examples repository ```bash git clone https://github.com/flowdsl/examples cd examples ``` The repository contains several complete flow examples, the infrastructure Docker Compose file, and pre-built node implementations. ## Step 2: Start the infrastructure ```bash make up-infra ``` This starts: | Service | Port | What it is | | --------------- | ----- | -------------------------------------------- | | MongoDB | 27017 | Backing store for `durable` and `checkpoint` | | Redis | 6379 | Backing store for `ephemeral` | | Kafka | 9092 | Backing store for `stream` | | Zookeeper | 2181 | Kafka coordinator | | FlowDSL Studio | 5173 | Visual editor | | FlowDSL Runtime | 8081 | The runtime API | Wait for all services to show `healthy` in `docker compose ps`. ```bash docker compose ps ``` ```text NAME STATUS PORTS flowdsl-mongodb healthy 0.0.0.0:27017->27017/tcp flowdsl-redis healthy 0.0.0.0:6379->6379/tcp flowdsl-kafka healthy 0.0.0.0:9092->9092/tcp flowdsl-studio healthy 0.0.0.0:5173->5173/tcp flowdsl-runtime healthy 0.0.0.0:8081->8081/tcp ``` ## Step 3: Open FlowDSL Studio Navigate to {rel=""nofollow""} in your browser. ![Studio welcome screen](https://flowdsl.com/img/docs/getting-started-studio-welcome.png) You'll see the Studio canvas — an empty graph editor with a toolbar at the top and a node palette on the right. ## Step 4: Load the Order Fulfillment example Click **File → Open Example → Order Fulfillment** or drag the file `examples/order-fulfillment/order-fulfillment.flowdsl.yaml` into the Studio canvas. ![Studio with Order Fulfillment flow loaded](https://flowdsl.com/img/docs/getting-started-studio.png) You'll see five nodes laid out on the canvas: ```mermaid flowchart LR A[OrderReceived\nsource] -->|direct| B[ValidateOrder\ntransform] B -->|durable| C[ReserveInventory\naction] C -->|durable| D[ChargePayment\naction] D -->|durable| E[SendConfirmation\naction] ``` ## Step 5: Validate the flow Click the **Validate** button in the top toolbar. The validator checks: - The document conforms to the FlowDSL JSON Schema - All referenced packet types exist in `components.packets` - All `operationId` values are unique - All edges reference valid node names You should see a green **Valid** status. If you see errors, they'll appear in the Validation panel with file paths and line numbers. ## Step 6: Export to JSON Click **File → Export → JSON** to see the canonical form of the document. ```json { "flowdsl": "1.0", "info": { "title": "Order Fulfillment", "version": "1.0.0" }, "nodes": { "OrderReceived": { "operationId": "receive_order", "kind": "source" }, "ValidateOrder": { "operationId": "validate_order", "kind": "transform" }, "ReserveInventory": { "operationId": "reserve_inventory", "kind": "action" }, "ChargePayment": { "operationId": "charge_payment", "kind": "action" }, "SendConfirmation": { "operationId": "send_confirmation", "kind": "action" } }, "edges": [ { "from": "OrderReceived", "to": "ValidateOrder", "delivery": { "mode": "direct", "packet": "OrderPayload" } }, { "from": "ValidateOrder", "to": "ReserveInventory", "delivery": { "mode": "durable", "packet": "ValidatedOrder", "retryPolicy": { "maxAttempts": 3, "backoff": "exponential", "initialDelay": "PT2S" } } }, { "from": "ReserveInventory", "to": "ChargePayment", "delivery": { "mode": "durable", "packet": "ReservationResult", "idempotencyKey": "{{payload.orderId}}-charge" } }, { "from": "ChargePayment", "to": "SendConfirmation", "delivery": { "mode": "durable", "packet": "PaymentResult", "idempotencyKey": "{{payload.orderId}}-confirm" } } ], "components": { "packets": { "...": "..." } } } ``` ## Step 7: Understand the document **`info`** — document metadata: title, version, who owns it. **`nodes`** — the graph vertices. Each node has: - `operationId` — the handler function name (snake\_case) - `kind` — the node's role (source, transform, action, etc.) **`edges`** — the graph edges connecting nodes. Each edge has: - `from` / `to` — which nodes to connect - `delivery.mode` — transport and durability (this is the key decision) **`components.packets`** — the typed schemas for data flowing between nodes. **Why `direct` for validation but `durable` for payment?** `ValidateOrder` is a fast, deterministic, in-process check. If it fails, the upstream system resends the order. `direct` is correct here — no durability needed. `ChargePayment` calls an external payment processor. If the process crashes between the charge and the confirmation, you need the packet to survive the restart and the idempotency key to prevent double-charging. `durable` with `idempotencyKey` is the only safe choice. ## What's next You've seen a running flow and understood its structure. Now build one from scratch: - [Your First Flow](https://flowdsl.com/docs/tutorials/your-first-flow) — build a webhook-to-Slack routing flow step by step - [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) — understand the five modes in depth - [Nodes](https://flowdsl.com/docs/concepts/nodes) — the nine node kinds explained # Tutorials These tutorials take you from zero to production-ready FlowDSL flows. Each one is self-contained and builds a real use case — not toy examples. By the end you will know how to design flow documents, choose delivery modes, implement custom nodes in Go or Python, and operate FlowDSL locally with Docker Compose. ## What's in this section | Tutorial | What you build | Time | | ------------------------------------------------------------------------------- | ----------------------------------------------------------------- | ------ | | [Getting Started](https://flowdsl.com/docs/tutorials/getting-started) | Load and explore the Order Fulfillment example in Studio | 5 min | | [Your First Flow](https://flowdsl.com/docs/tutorials/your-first-flow) | A webhook-to-Slack routing flow, built incrementally from scratch | 20 min | | [Email Triage Flow](https://flowdsl.com/docs/tutorials/email-triage-flow) | A stateful LLM-powered email classification workflow | 30 min | | [Sales Pipeline Flow](https://flowdsl.com/docs/tutorials/sales-pipeline-flow) | CRM lead enrichment, scoring, and routing | 30 min | | [Connecting AsyncAPI](https://flowdsl.com/docs/tutorials/connecting-asyncapi) | Reference existing AsyncAPI event schemas in FlowDSL | 15 min | | [Using the Studio](https://flowdsl.com/docs/tutorials/using-the-studio) | Full walkthrough of the FlowDSL visual editor | 15 min | | [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) | Implement and register a node using the `flowdsl-go` SDK | 25 min | | [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) | Implement and register a node using `flowdsl-py` | 25 min | | [Docker Compose Local](https://flowdsl.com/docs/tutorials/docker-compose-local) | Spin up the full FlowDSL infrastructure stack locally | 15 min | ## Prerequisites Most tutorials assume: - Basic familiarity with YAML - Docker Desktop installed (for local infrastructure tutorials) - A terminal and a code editor Node implementation tutorials additionally require: - **Go tutorials:** Go 1.21 or later - **Python tutorials:** Python 3.10 or later, `pip` You do not need expertise in Kafka, MongoDB, or Redis. FlowDSL abstracts those behind delivery mode declarations — the runtime handles transport configuration. --- New to FlowDSL? Start with [Getting Started](https://flowdsl.com/docs/tutorials/getting-started) — it takes five minutes and gives you a complete mental model before you build anything from scratch. # Build a Sales Pipeline Flow This tutorial builds a lead processing pipeline that enriches incoming leads with external data, scores them using an LLM, and routes them to the correct sales action based on their score. ## What you'll build ```mermaid flowchart TD A[LeadReceived\nsource] -->|durable| B[EnrichLead\nintegration] B -->|durable| C[ScoreLead\nllm] C -->|durable| D{RouteByScore\nrouter} D -->|hot score ≥80| E[AssignToSalesRep\naction] D -->|warm 40–79| F[AddToNurtureCampaign\naction] D -->|cold score <40| G[ArchiveLead\nterminal] E -->|durable| H[SendWelcomeEmail\naction] ``` ## The complete flow YAML ```yaml flowdsl: "1.0" info: title: Sales Pipeline version: "1.0.0" description: | Processes incoming leads: enriches from Clearbit, scores with LLM, routes hot/warm/cold to appropriate sales actions. nodes: LeadReceived: operationId: receive_lead kind: source summary: Receives new lead submissions from the website form or CRM webhook outputs: out: { packet: LeadPayload } EnrichLead: operationId: enrich_lead_clearbit kind: integration summary: Enriches lead data from Clearbit (company size, industry, funding) inputs: in: { packet: LeadPayload } outputs: out: { packet: EnrichedLead } settings: clearbitApiKey: "${CLEARBIT_API_KEY}" enrichFields: [company, person, employment] ScoreLead: operationId: llm_score_lead kind: llm summary: Scores the lead 0–100 based on ICP fit using LLM inputs: in: { packet: EnrichedLead } outputs: out: { packet: ScoredLead } settings: model: gpt-4o-mini temperature: 0.2 systemPrompt: | You are a B2B sales qualification expert. Score this lead 0-100 based on ideal customer profile fit. Consider: company size (prefer 50-500 employees), industry fit, funding stage, and seniority of contact. Return JSON: {"score": 0-100, "tier": "hot|warm|cold", "reasoning": "...", "topSignals": [...]} RouteByScore: operationId: route_lead_by_score kind: router summary: Routes leads based on their ICP score inputs: in: { packet: ScoredLead } outputs: hot: packet: ScoredLead description: Score ≥80 — assign to a sales rep immediately warm: packet: ScoredLead description: Score 40–79 — add to nurture campaign cold: packet: ScoredLead description: Score <40 — archive AssignToSalesRep: operationId: assign_sales_rep kind: action summary: Assigns the lead to a sales rep via round-robin in the CRM inputs: in: { packet: ScoredLead } outputs: out: { packet: AssignedLead } settings: crmSystem: salesforce assignmentStrategy: round-robin territory: us-west SendWelcomeEmail: operationId: send_welcome_email kind: action summary: Sends a personalized welcome email to the hot lead inputs: in: { packet: AssignedLead } settings: templateId: hot-lead-welcome-v3 senderName: "Sales Team" AddToNurtureCampaign: operationId: add_to_nurture_campaign kind: action summary: Enrolls the lead in a 6-email nurture sequence inputs: in: { packet: ScoredLead } settings: campaignId: nurture-sequence-q1-2026 startDelay: PT24H ArchiveLead: operationId: archive_cold_lead kind: terminal summary: Archives the lead with cold classification for future re-engagement inputs: in: { packet: ScoredLead } edges: # Enrich incoming lead — durable because external API call - from: LeadReceived to: EnrichLead delivery: mode: durable packet: LeadPayload idempotencyKey: "{{payload.leadId}}-enrich" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT3S retryOn: [TIMEOUT, RATE_LIMITED] # LLM scoring — durable + idempotency (expensive, non-deterministic) - from: EnrichLead to: ScoreLead delivery: mode: durable packet: EnrichedLead idempotencyKey: "{{payload.leadId}}-score" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S retryOn: [RATE_LIMITED, TIMEOUT] # Route to appropriate path - from: ScoreLead to: RouteByScore delivery: mode: durable packet: ScoredLead # Hot path — assign to sales rep immediately - from: RouteByScore.hot to: AssignToSalesRep delivery: mode: durable packet: ScoredLead idempotencyKey: "{{payload.leadId}}-assign" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S # Welcome email for hot leads after assignment - from: AssignToSalesRep to: SendWelcomeEmail delivery: mode: durable packet: AssignedLead idempotencyKey: "{{payload.leadId}}-welcome-email" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S # Warm path — nurture campaign - from: RouteByScore.warm to: AddToNurtureCampaign delivery: mode: durable packet: ScoredLead idempotencyKey: "{{payload.leadId}}-nurture" retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S # Cold path — archive directly (fast, idempotent) - from: RouteByScore.cold to: ArchiveLead delivery: mode: direct packet: ScoredLead components: packets: LeadPayload: type: object properties: leadId: { type: string } email: { type: string, format: email } firstName: { type: string } lastName: { type: string } company: { type: string } title: { type: string } phone: { type: string } source: type: string enum: [website, referral, event, linkedin, cold-outreach] submittedAt: { type: string, format: date-time } required: [leadId, email, company, source, submittedAt] EnrichedLead: type: object properties: lead: $ref: "#/components/packets/LeadPayload" clearbit: type: object properties: companySize: type: integer description: Number of employees industry: { type: string } fundingStage: { type: string } annualRevenue: { type: number } technologies: { type: array, items: { type: string } } additionalProperties: true enrichedAt: { type: string, format: date-time } required: [lead, enrichedAt] ScoredLead: type: object properties: lead: $ref: "#/components/packets/EnrichedLead" score: type: integer minimum: 0 maximum: 100 tier: type: string enum: [hot, warm, cold] reasoning: { type: string } topSignals: type: array items: { type: string } scoredAt: { type: string, format: date-time } required: [lead, score, tier, scoredAt] AssignedLead: type: object properties: lead: $ref: "#/components/packets/ScoredLead" assignedTo: type: object properties: repId: { type: string } repName: { type: string } repEmail: { type: string, format: email } required: [repId, repName, repEmail] crmLeadId: { type: string } assignedAt: { type: string, format: date-time } required: [lead, assignedTo, assignedAt] ``` ## Delivery mode decisions | Edge | Mode | Why | | ---------------------------------------- | ------- | ------------------------------------ | | LeadReceived → EnrichLead | durable | External API call, loss unacceptable | | EnrichLead → ScoreLead | durable | Expensive LLM call with idempotency | | ScoreLead → RouteByScore | durable | Business-critical classification | | RouteByScore.hot → AssignToSalesRep | durable | CRM write, must not duplicate | | AssignToSalesRep → SendWelcomeEmail | durable | Email send, must not duplicate | | RouteByScore.warm → AddToNurtureCampaign | durable | Campaign enrollment, idempotent key | | RouteByScore.cold → ArchiveLead | direct | Fast, idempotent, no external calls | ## Next steps - [LLM Flows](https://flowdsl.com/docs/guides/llm-flows) — deep patterns for LLM scoring and classification - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — preventing duplicate CRM entries and emails - [Email Triage Flow](https://flowdsl.com/docs/tutorials/email-triage-flow) — another real-world stateful flow # Using FlowDSL Studio FlowDSL Studio is the official visual editor for FlowDSL flows. It renders a `.flowdsl.yaml` document as an interactive canvas where you can explore, edit, and validate flows. The YAML file is always the source of truth — Studio is a view on top of it. ## What Studio is Studio is a React application built on [React Flow](https://reactflow.dev){rel=""nofollow""}. It runs as a web app at `https://flowdsl.com/studio` or locally via Docker Compose at `http://localhost:5173`. Studio does NOT generate code or manage infrastructure. It edits FlowDSL documents and validates them against the spec schema. ## Opening Studio **Cloud (no setup required):** Navigate to {rel=""nofollow""}. **Local (full infrastructure):** ```bash cd examples && make up-infra open http://localhost:5173 ``` ## The canvas ![Studio canvas overview](https://flowdsl.com/img/docs/studio-canvas-overview.png) The canvas has four areas: | Area | Location | What it shows | | ---------------- | --------------------- | -------------------------------------------- | | **Toolbar** | Top | File menu, Validate, Export, Zoom, Fit view | | **Canvas** | Center | The flow graph — nodes and edges | | **Node palette** | Right | Available node kinds to drag onto the canvas | | **Inspector** | Right (when selected) | Properties of the selected node or edge | ## Creating nodes **From the palette:** 1. Click a node kind in the right panel (source, transform, router, etc.) 2. Drag it onto the canvas 3. Double-click to open the node editor 4. Fill in `operationId`, `summary`, ports **From YAML:** Edit the `.flowdsl.yaml` file directly. Studio reloads automatically if the file is open in a connected editor via the watch mode. ## Connecting nodes (drawing edges) 1. Hover over a node — small circles appear on its output ports 2. Click and drag from an output port to an input port of another node 3. A dialog appears to configure the edge's delivery policy 4. Select the delivery mode and configure optional fields (packet, retry policy, idempotency key) ![Drawing an edge in Studio](https://flowdsl.com/img/docs/studio-draw-edge.png) ## Setting delivery modes on edges Right-click an edge → **Edge Properties** to open the delivery policy editor: - **Mode** — dropdown: direct, ephemeral, checkpoint, durable, stream - **Packet** — autocomplete from `components.packets` and any AsyncAPI references - **Retry policy** — toggle to add retry settings - **Idempotency key** — template string for deduplication Each mode has a distinct color on the canvas: - `direct` — gray - `ephemeral` — blue - `checkpoint` — purple - `durable` — green - `stream` — orange ## Validating a flow Click the **Validate** button in the toolbar (or press `Ctrl+Shift+V`). The validator runs the FlowDSL JSON Schema check plus semantic rules: - All node names are unique - All `operationId` values are unique - All packet references resolve - No cycles in the graph (FlowDSL requires a DAG) - All router outputs are connected to at least one edge A green **Valid** badge appears on success. Errors appear in the validation panel with line numbers pointing to the YAML. ![Validation panel showing errors](https://flowdsl.com/img/docs/studio-validation-errors.png) ## Exporting **File → Export → YAML** — exports the canonical `.flowdsl.yaml`**File → Export → JSON** — exports the canonical `.flowdsl.json` Both formats are equivalent. JSON is what the runtime loads; YAML is for human authoring. ## Importing flows **File → Open** — open a local `.flowdsl.yaml` or `.flowdsl.json` file **File → Open Example** — load one of the built-in example flows **File → Import from URL** — load a flow from a public URL or GitHub raw link ## The node inspector panel Click any node to open the inspector on the right: - **Kind badge** — color-coded kind (source, transform, router, etc.) - **operationId** — editable snake\_case identifier - **Summary** — short one-line description - **Input ports** — list of inputs with packet types, expandable to show schema - **Output ports** — list of outputs with packet types - **Settings** — static configuration fields The inspector renders the node as a **NodeContractCard** — the bilateral contract visualization unique to FlowDSL. ## The execution monitor When the runtime is running locally (requires [Docker Compose Local](https://flowdsl.com/docs/tutorials/docker-compose-local)), Studio shows a live execution monitor: - Real-time event stream for each flow execution - Per-node status: waiting, running, completed, failed - Packet payload inspection (click any node to see its last input/output) - Dead letter queue inspection - Retry count per edge ![Execution monitor in Studio](https://flowdsl.com/img/docs/studio-execution-monitor.png) ## Keyboard shortcuts | Shortcut | Action | | ------------------- | ---------------------------- | | `Ctrl+S` | Save (writes YAML to disk) | | `Ctrl+Shift+V` | Validate | | `Ctrl+E` | Export to JSON | | `Ctrl+Z` / `Ctrl+Y` | Undo / Redo | | `Space + drag` | Pan the canvas | | `Ctrl+scroll` | Zoom in/out | | `Ctrl+Shift+F` | Fit all nodes in view | | `Delete` | Delete selected node or edge | | `Escape` | Deselect | ## Summary - Studio edits FlowDSL documents visually — the YAML is always the source of truth - Draw edges by dragging from output ports to input ports - Right-click edges to set delivery modes and retry policies - Validate before deploying — the validator catches schema errors and semantic problems - The execution monitor shows live flow execution when connected to a local runtime ## Next steps - [Getting Started](https://flowdsl.com/docs/tutorials/getting-started) — run your first flow - [Your First Flow](https://flowdsl.com/docs/tutorials/your-first-flow) — build a flow and see it in Studio - [Docker Compose Local](https://flowdsl.com/docs/tutorials/docker-compose-local) — enable the execution monitor # Write a FlowDSL Node in Go This tutorial implements the `filter_by_priority` node from the webhook router tutorial using the `flowdsl-go` SDK. By the end you'll have a running node that the FlowDSL runtime can connect to. ## What you'll build A `FilterNode` that reads the `priority` field from an incoming payload and routes the packet to one of two named outputs: `urgent_out` (P0/P1) or `normal_out` (P2+). ## Prerequisites - Go 1.21 or later: `go version` - FlowDSL runtime running locally (see [Docker Compose Local](https://flowdsl.com/docs/tutorials/docker-compose-local)) ## Step 1: Initialize the project ```bash mkdir flowdsl-filter-node cd flowdsl-filter-node go mod init github.com/myorg/flowdsl-filter-node go get github.com/flowdsl/flowdsl-go ``` ## Step 2: Project structure ```text flowdsl-filter-node/ ├── main.go ├── node.go ├── flowdsl-node.json └── go.mod ``` ## Step 3: Implement the NodeHandler interface Create `node.go`: ```go package main import ( "context" "fmt" flowdsl "github.com/flowdsl/flowdsl-go" ) // FilterNode implements the filter_by_priority operation. // It reads the "priority" field from the input packet and routes // P0/P1 events to "urgent_out", P2+ to "normal_out". type FilterNode struct { urgentPriorities map[string]bool } // OperationID returns the snake_case identifier that matches the // operationId in the FlowDSL document. func (n *FilterNode) OperationID() string { return "filter_by_priority" } // Init is called once at startup with the node's static settings. func (n *FilterNode) Init(settings flowdsl.Settings) error { urgentList, _ := settings.GetStringSlice("urgentPriorities") if len(urgentList) == 0 { urgentList = []string{"P0", "P1"} } n.urgentPriorities = make(map[string]bool, len(urgentList)) for _, p := range urgentList { n.urgentPriorities[p] = true } return nil } // Handle is called once per incoming packet. func (n *FilterNode) Handle(ctx context.Context, input flowdsl.NodeInput) (flowdsl.NodeOutput, error) { // Read the input packet from the "in" port payload, err := input.Packet("in") if err != nil { return flowdsl.NodeOutput{}, fmt.Errorf("filter_by_priority: reading input: %w", err) } // Extract the priority field priority, ok := payload.GetString("priority") if !ok { // Missing priority — treat as normal priority = "P2" } // Route based on priority if n.urgentPriorities[priority] { return flowdsl.NodeOutput{}.Send("urgent_out", payload), nil } return flowdsl.NodeOutput{}.Send("normal_out", payload), nil } ``` ### Key interfaces **`flowdsl.NodeInput`** — the input wrapper. Methods: - `Packet(portName string) (flowdsl.Packet, error)` — read a packet from a named input port - `Context() context.Context` — the execution context with tracing and cancellation **`flowdsl.Packet`** — the packet wrapper. Methods: - `GetString(key string) (string, bool)` — read a string field - `GetInt(key string) (int64, bool)` — read an integer field - `GetFloat(key string) (float64, bool)` — read a float field - `GetBool(key string) (bool, bool)` — read a bool field - `GetMap(key string) (map[string]any, bool)` — read a nested object - `Data() map[string]any` — get the raw underlying map **`flowdsl.NodeOutput`** — the output builder. Methods: - `Send(portName string, packet flowdsl.Packet) NodeOutput` — route a packet to a named output port - `SendMap(portName string, data map[string]any) NodeOutput` — send from raw map ## Step 4: Create the entry point Create `main.go`: ```go package main import ( "log/slog" "os" flowdsl "github.com/flowdsl/flowdsl-go" ) func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) server := flowdsl.NewNodeServer( flowdsl.WithLogger(logger), flowdsl.WithGRPCPort(50051), flowdsl.WithManifestFile("flowdsl-node.json"), ) server.Register(&FilterNode{}) logger.Info("starting filter-node gRPC server", "port", 50051) if err := server.ServeGRPC(); err != nil { logger.Error("server failed", "error", err) os.Exit(1) } } ``` The node server now starts a gRPC server on port 50051 (the default for Go nodes). The runtime connects to this port to invoke the node via the `NodeService` gRPC contract. See [gRPC Protocol](https://flowdsl.com/docs/reference/grpc-protocol) for details. ## Step 5: Write the manifest Create `flowdsl-node.json`: ```json { "operationId": "filter_by_priority", "name": "Filter by Priority", "version": "1.0.0", "description": "Routes packets to urgent_out (P0/P1) or normal_out (P2+) based on the priority field", "runtime": "go", "inputs": [ { "name": "in", "packet": "TransformedPayload", "description": "Incoming event payload with a priority field" } ], "outputs": [ { "name": "urgent_out", "packet": "TransformedPayload", "description": "P0 and P1 events" }, { "name": "normal_out", "packet": "TransformedPayload", "description": "P2 and below events" } ], "settings": { "type": "object", "properties": { "urgentPriorities": { "type": "array", "items": { "type": "string" }, "default": ["P0", "P1"], "description": "Priority codes that route to urgent_out" } } }, "author": "My Team", "license": "Apache-2.0", "tags": ["routing", "priority", "filter"] } ``` ## Step 6: Build and run ```bash go build -o filter-node . ./filter-node ``` ```json {"time":"2026-03-28T10:00:00Z","level":"INFO","msg":"starting filter-node gRPC server","port":50051} ``` ## Step 7: Register with the runtime Add the node to your `node-registry.yaml`: ```yaml nodes: filter_by_priority: address: localhost:50051 transport: grpc version: "1.0.0" runtime: go ``` ## Step 8: Test with a sample flow Start the runtime with the webhook router flow: ```bash FLOWDSL_REGISTRY_FILE=./node-registry.yaml \ flowdsl-runtime start webhook-router.flowdsl.yaml ``` Send a test event: ```bash curl -X POST http://localhost:8081/flows/webhook-router/trigger \ -H "Content-Type: application/json" \ -d '{ "priority": "P0", "title": "Production database unreachable", "source": "alertmanager", "timestamp": "2026-03-28T10:00:00Z" }' ``` Check the execution log — you should see the `FilterNode` routing the packet to `urgent_out`. ## Error handling patterns ```go func (n *FilterNode) Handle(ctx context.Context, input flowdsl.NodeInput) (flowdsl.NodeOutput, error) { payload, err := input.Packet("in") if err != nil { // Return a typed FlowDSL error for proper dead letter categorization return flowdsl.NodeOutput{}, flowdsl.NewNodeError( flowdsl.ErrCodeInputMissing, "missing input packet on port 'in'", err, ) } priority, ok := payload.GetString("priority") if !ok { // Return a validation error — this will NOT be retried (it's a data problem) return flowdsl.NodeOutput{}, flowdsl.NewNodeError( flowdsl.ErrCodeValidation, "priority field missing from payload", nil, ) } // ... routing logic } ``` FlowDSL error codes: - `ErrCodeValidation` — data problem, not retriable - `ErrCodeTimeout` — transient, retriable - `ErrCodeRateLimited` — transient, retriable - `ErrCodeTemporary` — transient, retriable - `ErrCodePermanent` — permanent failure, move to dead letter immediately ## Logging and observability ```go func (n *FilterNode) Handle(ctx context.Context, input flowdsl.NodeInput) (flowdsl.NodeOutput, error) { // Extract FlowDSL trace context for correlated logging traceCtx := flowdsl.TraceFromContext(ctx) logger := slog.With( "flowId", traceCtx.FlowID, "executionId", traceCtx.ExecutionID, "nodeId", "FilterByPriority", ) payload, err := input.Packet("in") if err != nil { logger.Error("failed to read input packet", "error", err) return flowdsl.NodeOutput{}, err } priority, _ := payload.GetString("priority") logger.Info("routing packet", "priority", priority) // ... } ``` ## Summary | File | Purpose | | ------------------- | ------------------------------------------------- | | `node.go` | Node handler implementing `flowdsl.NodeHandler` | | `main.go` | Node server that registers and serves the handler | | `flowdsl-node.json` | Manifest describing the node to the registry | ## Next steps - [Write a Python Node](https://flowdsl.com/docs/tutorials/writing-a-python-node) — the same node in Python - [Node Development](https://flowdsl.com/docs/guides/node-development) — testing, versioning, publishing - [Node Manifest reference](https://flowdsl.com/docs/reference/node-manifest) — full manifest field reference # Write a FlowDSL Node in Python This tutorial implements the `llm_analyze_email` node from the email triage tutorial using the `flowdsl-py` SDK. By the end you'll have an async Python node that calls OpenAI to classify emails and returns structured output. ## What you'll build An `LlmAnalyzerNode` that reads an email payload, sends it to an LLM for classification, and returns a structured `AnalysisResult` with classification, confidence score, and reason. ## Prerequisites - Python 3.10 or later: `python --version` - `pip` package manager - An OpenAI API key (or compatible API) ## Step 1: Install the SDK ```bash pip install flowdsl-py openai ``` ## Step 2: Project structure ```text llm-analyzer-node/ ├── main.py ├── node.py ├── flowdsl-node.json └── requirements.txt ``` `requirements.txt`: ```text flowdsl-py>=1.0.0 openai>=1.0.0 ``` ## Step 3: Implement the node Create `node.py`: ```python import json import time from typing import Any from openai import AsyncOpenAI from flowdsl import BaseNode, NodeInput, NodeOutput, NodeError, ErrorCode class LlmAnalyzerNode(BaseNode): """ Classifies an email as urgent, normal, or spam using an LLM. operationId: llm_analyze_email """ operation_id = "llm_analyze_email" def __init__(self) -> None: self._client: AsyncOpenAI | None = None self._model: str = "gpt-4o-mini" self._temperature: float = 0.1 self._system_prompt: str = self._default_system_prompt() async def init(self, settings: dict[str, Any]) -> None: """Called once at startup with the node's static settings.""" self._model = settings.get("model", "gpt-4o-mini") self._temperature = settings.get("temperature", 0.1) custom_prompt = settings.get("systemPrompt") if custom_prompt: self._system_prompt = custom_prompt self._client = AsyncOpenAI() # reads OPENAI_API_KEY from env async def handle(self, input: NodeInput) -> NodeOutput: """Called once per incoming email packet.""" # Read the input packet from the "in" port payload = await input.packet("in") message_id = payload.get("messageId", "unknown") subject = payload.get("subject", "") body = payload.get("body", "") if not body and not subject: raise NodeError( ErrorCode.VALIDATION, "Email payload has neither subject nor body", ) # Call the LLM result = await self._classify(subject, body, message_id) # Build the output AnalysisResult output_data = { "email": payload.data, "classification": result["classification"], "confidence": result["confidence"], "reason": result.get("reason", ""), "analyzedAt": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } return NodeOutput().send("out", output_data) async def _classify( self, subject: str, body: str, message_id: str ) -> dict[str, Any]: """Calls the LLM and parses the classification response.""" if self._client is None: raise NodeError(ErrorCode.TEMPORARY, "LLM client not initialized") prompt = f"Subject: {subject}\n\nBody:\n{body}" try: response = await self._client.chat.completions.create( model=self._model, temperature=self._temperature, messages=[ {"role": "system", "content": self._system_prompt}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, ) except Exception as e: # Check if this is a rate limit error if "rate_limit" in str(e).lower(): raise NodeError( ErrorCode.RATE_LIMITED, f"OpenAI rate limit hit for message {message_id}", original=e, ) raise NodeError( ErrorCode.TEMPORARY, f"LLM call failed for message {message_id}", original=e, ) raw = response.choices[0].message.content try: parsed = json.loads(raw) except json.JSONDecodeError as e: raise NodeError( ErrorCode.TEMPORARY, f"LLM returned invalid JSON for message {message_id}: {raw!r}", original=e, ) classification = parsed.get("classification", "").lower() if classification not in {"urgent", "normal", "spam"}: # Treat unexpected classification as normal (safe default) classification = "normal" return { "classification": classification, "confidence": float(parsed.get("confidence", 0.5)), "reason": parsed.get("reason", ""), } @staticmethod def _default_system_prompt() -> str: return """You are an expert support email classifier. Classify the email as exactly one of: urgent, normal, or spam. Urgent: production outages, security incidents, data loss, legal issues. Normal: feature requests, bug reports, billing questions, general support. Spam: promotional emails, irrelevant content, automated notifications. Respond with JSON: {"classification": "urgent|normal|spam", "confidence": 0.0-1.0, "reason": "brief explanation"}""" ``` ### Key classes **`BaseNode`** — base class for all FlowDSL nodes. Override: - `operation_id: str` — class variable, matches the `operationId` in the flow document - `async def init(self, settings: dict) -> None` — called once at startup with static settings - `async def handle(self, input: NodeInput) -> NodeOutput` — called once per packet **`NodeInput`** — input wrapper. Methods: - `await input.packet(port_name: str) -> Packet` — read a packet from a named input port - `input.context` — the execution context (flow\_id, execution\_id, trace headers) **`Packet`** — the packet wrapper. Properties: - `packet.data: dict[str, Any]` — the raw underlying dict - `packet.get(key, default=None)` — read a field with optional default **`NodeOutput`** — the output builder. Methods: - `NodeOutput().send(port_name: str, data: dict | Packet) -> NodeOutput` — route to a named port **`NodeError`** — typed errors for proper runtime handling. ErrorCodes: - `ErrorCode.VALIDATION` — data problem, not retriable - `ErrorCode.RATE_LIMITED` — retriable, rate limit - `ErrorCode.TIMEOUT` — retriable, timeout - `ErrorCode.TEMPORARY` — retriable, transient failure - `ErrorCode.PERMANENT` — permanent, move to dead letter ## Step 4: Create the entry point Create `main.py`: ```python import asyncio import logging import os from flowdsl import NodeServer from node import LlmAnalyzerNode async def main() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) logger = logging.getLogger("llm-analyzer-node") grpc_port = int(os.getenv("FLOWDSL_GRPC_PORT", "50052")) server = NodeServer( grpc_port=grpc_port, manifest_file="flowdsl-node.json", logger=logger, ) server.register(LlmAnalyzerNode()) logger.info("starting llm-analyzer-node gRPC server on port %d", grpc_port) await server.serve_grpc() if __name__ == "__main__": asyncio.run(main()) ``` The node server starts a gRPC server on port 50052 (default for Python nodes). The runtime connects via the `NodeService` gRPC contract. See [gRPC Protocol](https://flowdsl.com/docs/reference/grpc-protocol) for details. ## Step 5: Write the manifest Create `flowdsl-node.json`: ```json { "operationId": "llm_analyze_email", "name": "LLM Email Analyzer", "version": "1.0.0", "description": "Classifies support emails as urgent, normal, or spam using an LLM", "runtime": "python", "inputs": [ { "name": "in", "packet": "EmailPayload", "description": "The email to classify" } ], "outputs": [ { "name": "out", "packet": "AnalysisResult", "description": "Classification result with confidence score" } ], "settings": { "type": "object", "properties": { "model": { "type": "string", "default": "gpt-4o-mini", "description": "OpenAI model to use for classification" }, "temperature": { "type": "number", "default": 0.1, "minimum": 0, "maximum": 2 }, "systemPrompt": { "type": "string", "description": "Custom system prompt. Uses default if not provided." } } }, "author": "My Team", "license": "Apache-2.0", "tags": ["llm", "email", "classification", "nlp"] } ``` ## Step 6: Run the node ```bash OPENAI_API_KEY=sk-... python main.py ``` ```text 2026-03-28 10:00:00 INFO llm-analyzer-node starting llm-analyzer-node gRPC server on port 50052 2026-03-28 10:00:00 INFO flowdsl.server registered operation_id=llm_analyze_email 2026-03-28 10:00:00 INFO flowdsl.server listening grpc_port=50052 ``` ## Step 7: Register with the runtime Add to `node-registry.yaml`: ```yaml nodes: llm_analyze_email: address: localhost:50052 transport: grpc version: "1.0.0" runtime: python ``` ## Step 8: Testing with pytest Create `test_node.py`: ```python import pytest from unittest.mock import AsyncMock, patch from flowdsl.testing import MockNodeInput from node import LlmAnalyzerNode @pytest.fixture def node(): n = LlmAnalyzerNode() return n @pytest.mark.asyncio async def test_classifies_urgent_email(node): await node.init({"model": "gpt-4o-mini"}) mock_llm_response = '{"classification": "urgent", "confidence": 0.97, "reason": "Production outage"}' email_payload = { "messageId": "msg-001", "from": "user@example.com", "subject": "Database is down", "body": "Production database is unreachable. All requests failing.", "receivedAt": "2026-03-28T10:00:00Z", } with patch.object( node._client.chat.completions, "create", return_value=AsyncMock( choices=[AsyncMock(message=AsyncMock(content=mock_llm_response))] ), ): input_ = MockNodeInput({"in": email_payload}) output = await node.handle(input_) assert output.packets["out"]["classification"] == "urgent" assert output.packets["out"]["confidence"] == 0.97 assert output.packets["out"]["email"]["messageId"] == "msg-001" @pytest.mark.asyncio async def test_handles_rate_limit(node): from flowdsl import NodeError, ErrorCode await node.init({}) with patch.object( node._client.chat.completions, "create", side_effect=Exception("rate_limit exceeded"), ): input_ = MockNodeInput({"in": {"messageId": "msg-002", "subject": "test", "body": "test", "receivedAt": "2026-03-28T10:00:00Z"}}) with pytest.raises(NodeError) as exc_info: await node.handle(input_) assert exc_info.value.code == ErrorCode.RATE_LIMITED ``` ```bash pip install pytest pytest-asyncio pytest test_node.py -v ``` ## Idempotency in Python nodes For nodes with external side effects (email sends, ticket creation), implement idempotency by checking the idempotency key before performing the action: ```python async def handle(self, input: NodeInput) -> NodeOutput: payload = await input.packet("in") # The runtime passes the idempotency key from the edge policy idempotency_key = input.context.idempotency_key if idempotency_key: # Check if we already processed this key already_done = await self._check_idempotency(idempotency_key) if already_done: # Return the cached result without calling the external API again result = await self._get_cached_result(idempotency_key) return NodeOutput().send("out", result) # Perform the actual operation result = await self._send_sms(payload) if idempotency_key: await self._store_result(idempotency_key, result) return NodeOutput().send("out", result) ``` ## Summary | File | Purpose | | ------------------- | ---------------------------------------------- | | `node.py` | `LlmAnalyzerNode` implementing `BaseNode` | | `main.py` | Node server that registers and serves the node | | `flowdsl-node.json` | Manifest for the registry | | `test_node.py` | Unit tests with mocked LLM | ## Next steps - [LLM Flows](https://flowdsl.com/docs/guides/llm-flows) — patterns for building AI agent pipelines - [Idempotency](https://flowdsl.com/docs/guides/idempotency) — implementing safe idempotent handlers - [Python SDK Reference](https://flowdsl.com/docs/tools/python-sdk) — full SDK API reference # Build Your First FlowDSL Flow In this tutorial you will build a complete FlowDSL flow from an empty file, adding one node at a time and explaining every decision. The final flow routes incoming webhook events to Slack based on their priority level. ## What you'll build ```mermaid flowchart LR A[WebhookReceiver\nsource] -->|direct| B[JsonTransformer\ntransform] B -->|durable| C{PriorityFilter\nrouter} C -->|urgent| D[UrgentSlackNotifier\naction] C -->|normal| E[NormalSlackNotifier\naction] ``` A webhook receives a JSON POST → a transformer extracts key fields → a filter routes based on priority → the appropriate Slack channel is notified. ## Prerequisites - A text editor - Basic YAML familiarity - `ajv-cli` for validation (optional): `npm install -g ajv-cli` ## Step 1: Document skeleton Create a file called `webhook-router.flowdsl.yaml`: ```yaml flowdsl: "1.0" info: title: Webhook to Slack Router version: "1.0.0" description: | Routes incoming webhook events to Slack channels based on priority. Urgent events go to #incidents, normal events go to #notifications. externalDocs: url: https://github.com/myorg/event-schemas/blob/main/asyncapi.yaml description: AsyncAPI event schema definitions nodes: {} edges: [] components: packets: {} ``` **Why `externalDocs`?** It documents where the event schemas live, even if you're not referencing them directly in this file. It makes the document self-documenting for future readers. ## Step 2: Add the WebhookReceiver source node ```yaml flowdsl: "1.0" info: title: Webhook to Slack Router version: "1.0.0" nodes: WebhookReceiver: operationId: receive_webhook kind: source summary: Receives incoming webhook POST requests from external systems outputs: out: packet: WebhookPayload description: The raw webhook body edges: [] components: packets: {} ``` **Why `kind: source`?** Source nodes have no incoming edges — they are entry points triggered by external events. The runtime registers `receive_webhook` as a handler for incoming HTTP POST requests on the webhook endpoint. ## Step 3: Add the JsonTransformer node and edge ```yaml flowdsl: "1.0" info: title: Webhook to Slack Router version: "1.0.0" nodes: WebhookReceiver: operationId: receive_webhook kind: source outputs: out: { packet: WebhookPayload } JsonTransformer: operationId: transform_webhook_fields kind: transform summary: Extracts and normalizes key fields from the raw webhook body inputs: in: { packet: WebhookPayload } outputs: out: { packet: TransformedPayload } edges: - from: WebhookReceiver to: JsonTransformer delivery: mode: direct packet: WebhookPayload components: packets: {} ``` **Why `direct` here?** `JsonTransformer` is a cheap, deterministic, in-process field extraction. There's no external call, no side effect, and no need to survive a process crash. `direct` is the right mode — zero overhead. ## Step 4: Add the PriorityFilter router and conditional edges ```yaml flowdsl: "1.0" info: title: Webhook to Slack Router version: "1.0.0" nodes: WebhookReceiver: operationId: receive_webhook kind: source outputs: out: { packet: WebhookPayload } JsonTransformer: operationId: transform_webhook_fields kind: transform inputs: in: { packet: WebhookPayload } outputs: out: { packet: TransformedPayload } PriorityFilter: operationId: filter_by_priority kind: router summary: Routes events to urgent or normal Slack channels based on priority field inputs: in: { packet: TransformedPayload } outputs: urgent: packet: TransformedPayload description: P0 and P1 events → #incidents normal: packet: TransformedPayload description: P2+ events → #notifications edges: - from: WebhookReceiver to: JsonTransformer delivery: mode: direct packet: WebhookPayload - from: JsonTransformer to: PriorityFilter delivery: mode: direct packet: TransformedPayload - from: PriorityFilter.urgent to: UrgentSlackNotifier delivery: mode: durable packet: TransformedPayload retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S - from: PriorityFilter.normal to: NormalSlackNotifier delivery: mode: durable packet: TransformedPayload retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S components: packets: {} ``` **Why `durable` for the Slack notifications?** Sending to Slack is an external HTTP call. If the process crashes between the filter and the Slack send, you need the packet to survive and the call to be retried. `durable` ensures the notification is eventually delivered even after a crash. **Why the `.urgent` and `.normal` notation?** When a router node has multiple outputs, you must address them explicitly using `NodeName.outputPort` syntax. ## Step 5: Add the Slack notifier nodes ```yaml flowdsl: "1.0" info: title: Webhook to Slack Router version: "1.0.0" nodes: WebhookReceiver: operationId: receive_webhook kind: source outputs: out: { packet: WebhookPayload } JsonTransformer: operationId: transform_webhook_fields kind: transform inputs: in: { packet: WebhookPayload } outputs: out: { packet: TransformedPayload } PriorityFilter: operationId: filter_by_priority kind: router inputs: in: { packet: TransformedPayload } outputs: urgent: { packet: TransformedPayload } normal: { packet: TransformedPayload } UrgentSlackNotifier: operationId: notify_slack_urgent kind: action summary: Posts an alert to the #incidents Slack channel inputs: in: { packet: TransformedPayload } settings: slackChannel: "#incidents" mentionGroup: "@oncall" NormalSlackNotifier: operationId: notify_slack_normal kind: action summary: Posts a notification to the #notifications Slack channel inputs: in: { packet: TransformedPayload } settings: slackChannel: "#notifications" edges: - from: WebhookReceiver to: JsonTransformer delivery: mode: direct packet: WebhookPayload - from: JsonTransformer to: PriorityFilter delivery: mode: direct packet: TransformedPayload - from: PriorityFilter.urgent to: UrgentSlackNotifier delivery: mode: durable packet: TransformedPayload retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S - from: PriorityFilter.normal to: NormalSlackNotifier delivery: mode: durable packet: TransformedPayload retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S components: packets: {} ``` **Why `settings` on the Slack nodes?** Settings are static configuration injected into the node at initialization — Slack channel names don't change at runtime. This keeps the node reusable: the same `notify_slack_urgent` handler can serve any flow with any channel configured via `settings`. ## Step 6: Add packet schemas ```yaml flowdsl: "1.0" info: title: Webhook to Slack Router version: "1.0.0" nodes: WebhookReceiver: operationId: receive_webhook kind: source outputs: out: { packet: WebhookPayload } JsonTransformer: operationId: transform_webhook_fields kind: transform inputs: in: { packet: WebhookPayload } outputs: out: { packet: TransformedPayload } PriorityFilter: operationId: filter_by_priority kind: router inputs: in: { packet: TransformedPayload } outputs: urgent: { packet: TransformedPayload } normal: { packet: TransformedPayload } UrgentSlackNotifier: operationId: notify_slack_urgent kind: action inputs: in: { packet: TransformedPayload } settings: slackChannel: "#incidents" mentionGroup: "@oncall" NormalSlackNotifier: operationId: notify_slack_normal kind: action inputs: in: { packet: TransformedPayload } settings: slackChannel: "#notifications" edges: - from: WebhookReceiver to: JsonTransformer delivery: mode: direct packet: WebhookPayload - from: JsonTransformer to: PriorityFilter delivery: mode: direct packet: TransformedPayload - from: PriorityFilter.urgent to: UrgentSlackNotifier delivery: mode: durable packet: TransformedPayload retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT2S - from: PriorityFilter.normal to: NormalSlackNotifier delivery: mode: durable packet: TransformedPayload retryPolicy: maxAttempts: 3 backoff: exponential initialDelay: PT5S components: packets: WebhookPayload: type: object description: Raw webhook body from the external system properties: source: type: string description: Originating system name eventType: type: string priority: type: string enum: [P0, P1, P2, P3, P4] title: type: string body: type: string timestamp: type: string format: date-time metadata: type: object additionalProperties: true required: [source, eventType, priority, title, timestamp] TransformedPayload: type: object description: Normalized payload with extracted fields properties: id: type: string description: Unique event ID generated by the transformer source: type: string priority: type: string enum: [P0, P1, P2, P3, P4] isUrgent: type: boolean description: True for P0 and P1 title: type: string body: type: string receivedAt: type: string format: date-time required: [id, source, priority, isUrgent, title, receivedAt] ``` ## Step 7: Validate ```bash # Download the FlowDSL schema curl -o flowdsl.schema.json https://flowdsl.com/schemas/v1/flowdsl.schema.json # Validate with ajv-cli npx ajv-cli validate -s flowdsl.schema.json -d webhook-router.flowdsl.yaml ``` Expected output: ```text webhook-router.flowdsl.yaml valid ``` ## Step 8: Load into Studio Drag `webhook-router.flowdsl.yaml` into the Studio canvas at {rel=""nofollow""}. You'll see the five nodes laid out on the canvas. Click any edge to inspect its delivery policy. Click any node to see its input/output ports and settings. ## Summary | Step | What you added | Why | | ---- | --------------------------------------- | ------------------------------------------- | | 1 | Document skeleton | Foundation with metadata | | 2 | WebhookReceiver source | Entry point for external events | | 3 | JsonTransformer + `direct` edge | Fast in-process field extraction | | 4 | PriorityFilter router + `durable` edges | Content-based routing with durable delivery | | 5 | Slack notifier nodes | Terminal action nodes with static config | | 6 | Packet schemas | Typed contracts for runtime validation | | 7 | Validation | Schema conformance check | ## Next steps - [Email Triage Flow](https://flowdsl.com/docs/tutorials/email-triage-flow) — a stateful LLM-powered workflow with idempotency - [Delivery Modes](https://flowdsl.com/docs/concepts/delivery-modes) — the five modes in depth - [Write a Go Node](https://flowdsl.com/docs/tutorials/writing-a-go-node) — implement the `filter_by_priority` handler