from typing import Annotated
from a2a.types import Message
from agentstack_sdk.a2a.extensions.common.form import FormRender, TextField
from agentstack_sdk.a2a.extensions.ui.form_request import (
FormRequestExtensionServer,
FormRequestExtensionSpec,
)
from agentstack_sdk.a2a.types import AgentMessage
from agentstack_sdk.server import Server
from agentstack_sdk.server.context import RunContext
server = Server()
@server.agent()
async def form_agent(
input: Message,
context: RunContext,
form_request: Annotated[FormRequestExtensionServer, FormRequestExtensionSpec()]
):
"""Agent that pauses execution to request user input"""
yield AgentMessage(text="I need some information from you.")
# Execution pauses here - task enters input_required state
# User fills out the form in the UI
form_data = await form_request.request_form(
form=FormRender(
title="Please provide your details",
fields=[
TextField(id="name", label="Your Name"),
TextField(id="email", label="Email Address"),
],
)
)
# Execution resumes after user submits the form
if form_data and form_data.values:
name = form_data.values["name"].value
email = form_data.values["email"].value
yield AgentMessage(text=f"Thank you, {name}! I'll contact you at {email}.")
else:
yield AgentMessage(text="Form was not filled out.")