Reference AsyncAPI Messages in FlowDSL
FlowDSL is self-contained, but if your team already has AsyncAPI documents describing your event contracts, you can reference those schemas directly in FlowDSL rather than duplicating them. This tutorial shows how.
Why reference AsyncAPI?
If your team maintains an AsyncAPI document for your event bus, it is the authoritative schema for your events. Duplicating those schemas in FlowDSL creates drift — two definitions that can fall out of sync. Referencing them directly means:
- One source of truth
- FlowDSL validates messages against the actual AsyncAPI-defined schema
- Changes to the AsyncAPI schema automatically apply to the FlowDSL flow
- Studio can show the resolved schema in the inspector
Your AsyncAPI document
# events.asyncapi.yaml
asyncapi: "2.6.0"
info:
title: Support Events
version: "1.0.0"
channels:
support/email-received:
subscribe:
operationId: email_received
message:
$ref: "#/components/messages/EmailReceived"
components:
messages:
EmailReceived:
name: EmailReceived
payload:
type: object
properties:
messageId:
type: string
from:
type: string
format: email
subject:
type: string
body:
type: string
receivedAt:
type: string
format: date-time
required: [messageId, from, subject, body, receivedAt]
TicketCreated:
name: TicketCreated
payload:
type: object
properties:
ticketId:
type: string
emailMessageId:
type: string
priority:
type: string
enum: [urgent, normal, low]
status:
type: string
enum: [open, pending, resolved]
required: [ticketId, emailMessageId, priority, status]
Reference it in FlowDSL
Declare the AsyncAPI document under externalDocs, then wrap the message ref in a components.events definition to keep node ports stable:
flowdsl: "1.0.0"
info:
title: Email Triage
version: "1.0.0"
# Declare the AsyncAPI document location
externalDocs:
asyncapi: "./events.asyncapi.yaml"
description: AsyncAPI event contracts for the support system
flows:
email_triage:
entrypoints:
- message:
$ref: "#/components/events/EmailReceived"
nodes:
email_receiver:
$ref: "#/components/nodes/EmailReceiverNode"
classify_email:
$ref: "#/components/nodes/ClassifyEmailNode"
create_ticket:
$ref: "#/components/nodes/CreateTicketNode"
edges:
- from: email_receiver
to: classify_email
delivery:
mode: durable
store: mongo
- from: classify_email
to: create_ticket
delivery:
mode: durable
store: mongo
components:
events:
# Boundary events — payload schemas owned by AsyncAPI
EmailReceived:
name: EmailReceived
version: "1.0.0"
payload:
$ref: "asyncapi#/components/messages/EmailReceived"
TicketCreated:
name: TicketCreated
version: "1.0.0"
payload:
$ref: "asyncapi#/components/messages/TicketCreated"
packets:
# Internal packet — not in AsyncAPI
AnalysisResult:
type: object
properties:
email:
type: object
classification:
type: string
enum: [urgent, normal, spam]
confidence:
type: number
required: [email, classification, confidence]
nodes:
EmailReceiverNode:
operationId: receive_email
kind: source
runtime:
language: go
handler: nodes.EmailReceiverNode
outputs:
- name: EmailReceived
message:
$ref: "#/components/events/EmailReceived"
ClassifyEmailNode:
operationId: classify_email
kind: llm
runtime:
language: python
handler: nodes.ClassifyEmailNode
inputs:
- message:
$ref: "#/components/events/EmailReceived"
outputs:
- name: Classified
message:
schema:
$ref: "#/components/packets/AnalysisResult"
CreateTicketNode:
operationId: create_ticket
kind: action
runtime:
language: go
handler: nodes.CreateTicketNode
inputs:
- message:
schema:
$ref: "#/components/packets/AnalysisResult"
outputs:
- name: TicketCreated
message:
$ref: "#/components/events/TicketCreated"
The reference syntax
| Syntax | Resolves to |
|---|---|
asyncapi#/components/messages/EmailReceived | The payload schema of EmailReceived in the linked AsyncAPI doc (default doc) |
asyncapi:support#/components/messages/EmailReceived | Same, from a named doc declared as support: in externalDocs.asyncapi |
#/components/events/EmailReceived | A FlowDSL event defined in components.events |
#/components/packets/AnalysisResult | A packet schema defined in components.packets |
The externalDocs.asyncapi ref lives inside the event's payload.$ref, keeping all node ports using stable FlowDSL-local #/components/events/... paths.
How the runtime resolves references
- At startup, the runtime reads
externalDocs.asyncapito locate the AsyncAPI document(s). - For each
asyncapi#/...payload ref, the runtime resolves the JSON Pointer in the loaded doc. - The resolved JSON Schema is used for message validation at runtime.
- If a referenced document is unavailable, the runtime fails to start.
Multiple AsyncAPI documents
If your flows span multiple services, you can reference several AsyncAPI documents by name:
externalDocs:
asyncapi:
default: "./events.asyncapi.yaml"
notifications: "https://notify.example.com/asyncapi.json"
Then use asyncapi:notifications#/... in event payload refs:
components:
events:
EmailSent:
payload:
$ref: "asyncapi:notifications#/components/messages/EmailSent"
Validation
Both documents validate independently:
# Validate the AsyncAPI document
asyncapi validate events.asyncapi.yaml
# Validate the FlowDSL document
flowdsl validate email-triage.flowdsl.yaml
The FlowDSL validator also resolves and validates all asyncapi#/... references — it will fail if the referenced path doesn't exist in the AsyncAPI document.
# events.asyncapi.yaml
asyncapi: "2.6.0"
info:
title: Support Events
version: "1.0.0"
channels:
support/email-received:
subscribe:
operationId: email_received
message:
$ref: "#/components/messages/EmailReceived"
components:
messages:
EmailReceived:
name: EmailReceived
payload:
type: object
properties:
messageId:
type: string
from:
type: string
format: email
subject:
type: string
body:
type: string
receivedAt:
type: string
format: date-time
required: [messageId, from, subject, body, receivedAt]
TicketCreated:
name: TicketCreated
payload:
type: object
properties:
ticketId:
type: string
emailMessageId:
type: string
priority:
type: string
enum: [urgent, normal, low]
status:
type: string
enum: [open, pending, resolved]
required: [ticketId, emailMessageId, priority, status]
Reference it in FlowDSL
flowdsl: "1.0"
info:
title: Email Triage
version: "1.0.0"
# Point to the AsyncAPI document
asyncapi: "./events.asyncapi.yaml"
externalDocs:
url: ./events.asyncapi.yaml
description: AsyncAPI event contracts for the support system
nodes:
EmailReceiver:
operationId: receive_email
kind: source
outputs:
out:
# Reference the AsyncAPI message directly
packet: "asyncapi#/components/messages/EmailReceived"
ClassifyEmail:
operationId: classify_email
kind: llm
inputs:
in:
packet: "asyncapi#/components/messages/EmailReceived"
outputs:
out:
# Native packet for the internal analysis result
packet: AnalysisResult
CreateTicket:
operationId: create_ticket
kind: action
inputs:
in:
packet: AnalysisResult
outputs:
out:
packet: "asyncapi#/components/messages/TicketCreated"
edges:
- from: EmailReceiver
to: ClassifyEmail
delivery:
mode: durable
packet: "asyncapi#/components/messages/EmailReceived"
- from: ClassifyEmail
to: CreateTicket
delivery:
mode: durable
packet: AnalysisResult
components:
packets:
# Native packet for data that doesn't exist in AsyncAPI
AnalysisResult:
type: object
properties:
email:
type: object
classification:
type: string
enum: [urgent, normal, spam]
confidence:
type: number
required: [email, classification, confidence]
The reference syntax
| Syntax | Resolves to |
|---|---|
asyncapi#/components/messages/EmailReceived | The payload schema of the EmailReceived message in the linked AsyncAPI doc |
MyPacket | A packet defined in components.packets.MyPacket in the current FlowDSL document |
The # is a JSON Pointer fragment. The path after # is resolved within the AsyncAPI document.
How the runtime resolves references
- At startup, the runtime reads the
asyncapifield to locate the AsyncAPI document. - For each edge with an
asyncapi#/...packet reference, the runtime resolves the JSON Pointer in the loaded AsyncAPI document. - The resolved JSON Schema is used for packet validation at runtime.
- If the AsyncAPI document is unavailable, the runtime fails to start.
Mixed native and AsyncAPI packets
You can freely mix native packets and AsyncAPI references in the same flow:
components:
packets:
# This packet doesn't exist in AsyncAPI — define it natively
InternalAnalysis:
type: object
properties:
urgencyScore: { type: number }
categories: { type: array, items: { type: string } }
edges:
- from: EmailReceiver
to: Analyzer
delivery:
packet: "asyncapi#/components/messages/EmailReceived" # from AsyncAPI
- from: Analyzer
to: Router
delivery:
packet: InternalAnalysis # native packet
Validation
Both documents validate independently:
# Validate the AsyncAPI document
asyncapi validate events.asyncapi.yaml
# Validate the FlowDSL document
flowdsl validate email-triage.flowdsl.yaml
The FlowDSL validator also resolves and validates all asyncapi#/... references — it will fail if the referenced path doesn't exist in the AsyncAPI document.
What happens when AsyncAPI schemas change
Non-breaking changes (adding optional fields): FlowDSL continues to work. Existing packets pass validation. Node handlers that don't read the new field are unaffected.
Breaking changes (removing required fields, renaming fields): Packet validation at the edge will fail for packets that no longer conform to the updated schema. The runtime will reject those packets and move them to the dead letter queue.
events.asyncapi.v2.yaml instead of overwriting events.asyncapi.yaml when making breaking changes. Reference the specific version in your FlowDSL document.Summary
- Set
asyncapi: "./path/to/asyncapi.yaml"in the FlowDSL document to link the AsyncAPI file. - Reference messages with
asyncapi#/components/messages/MessageNameon any packet field. - Mix native
components.packetswith AsyncAPI references freely. - Both documents validate independently; FlowDSL also validates the reference paths.
Next steps
- AsyncAPI Integration guide — full integration guide with schema evolution
- Packets concept — native packet definitions
- Email Triage Flow — complete email flow using AsyncAPI references

