Documentation Index
Fetch the complete documentation index at: https://docs.hexr.dev/llms.txt
Use this file to discover all available pages before exploring further.
hexr.a2a provides typed, authenticated communication between agents running on Hexr. Each agent-to-agent connection uses mutual TLS via SPIFFE identities on both sides — your agent proves who it is, and the remote agent does the same. The A2AClient handles task lifecycle (submission, polling, cancellation) and returns structured results with typed artifacts. On the receiving side, enabling a2a=True on @hexr_agent starts a bridge that routes incoming messages to your function automatically.
Quick start
Sending a message
from hexr.a2a import A2AClient, Message, TextPart
async with A2AClient("http://content-crew-a2a.tenant-acme.svc") as client:
# Discover the remote agent
card = await client.discover()
print(f"Agent: {card.name}, Skills: {[s.name for s in card.skills]}")
# Send a message
task = await client.send(Message(
parts=[TextPart(text="Write a blog post about AI agents")]
))
print(f"Task: {task.id}, State: {task.state}")
# Task: task_abc123, State: completed
for artifact in task.artifacts:
print(artifact.parts[0].text)
Receiving messages (A2A bridge)
When you set a2a=True on @hexr_agent, the bridge is started automatically. Your function is called when another agent sends a message:
@hexr_agent(
name="content-crew",
tenant="acme-corp",
a2a=True,
skills=[{"id": "write", "name": "Write", "description": "Write content"}]
)
def handle_request(message: str) -> str:
return f"Here's your content about: {message}"
A2AClient
from hexr.a2a import A2AClient
client = A2AClient(
base_url="http://agent-name-a2a.tenant-namespace.svc",
timeout=300, # Max wait for task completion
max_retries=3, # Retry on transient failures
card_cache_ttl=300 # Cache agent card for 5 minutes
)
Methods
Fetch the remote agent’s Agent Card. Cached for card_cache_ttl seconds.card = await client.discover()
print(card.name) # "content-crew"
print(card.description) # "Multi-agent content creation crew"
print(card.skills) # [Skill(id="write", name="Write", ...)]
print(card.capabilities) # {streaming: true, ...}
Returns: AgentCard
Send a message to the remote agent. Blocks until the task reaches a terminal state (completed, failed, or canceled).task = await client.send(Message(
parts=[TextPart(text="Analyze Q4 financials")]
))
if task.state == TaskState.COMPLETED:
result = task.artifacts[0].parts[0].text
elif task.state == TaskState.FAILED:
error = task.artifacts[0].parts[0].text
Returns: Task
Poll a task’s current state without blocking.task = await client.get_task("task_abc123")
print(task.state) # submitted | working | completed | failed | canceled
Request cooperative cancellation. The remote agent should check for cancellation and stop gracefully.task = await client.cancel_task("task_abc123")
Data models
Message
Messages carry one or more parts, each with a different content type:
from hexr.a2a import Message, TextPart, DataPart, FilePart, FileContent
# Text message
msg = Message(parts=[TextPart(text="Hello")])
# Structured data
msg = Message(parts=[DataPart(data={"query": "analyze", "sector": "tech"})])
# File attachment
msg = Message(parts=[FilePart(file=FileContent(
name="report.pdf",
mimeType="application/pdf",
bytes="base64-encoded-content"
))])
Task states
Tasks follow a linear lifecycle. INPUT_REQUIRED is a special state where the remote agent needs more information:
from hexr.a2a import TaskState
# Lifecycle: submitted → working → completed | failed | canceled
TaskState.SUBMITTED # Task received, not yet started
TaskState.WORKING # Agent is processing
TaskState.COMPLETED # Successfully finished (has artifacts)
TaskState.FAILED # Error occurred
TaskState.CANCELED # Cooperative cancellation
TaskState.INPUT_REQUIRED # Agent needs more info (question in artifacts)
AgentCard
card = await client.discover()
card.name # Agent name
card.description # Human-readable description
card.url # Agent A2A endpoint URL
card.version # Agent version
card.skills # List of skills [{id, name, description}]
card.capabilities # {streaming, pushNotifications, stateTransitionHistory}
Patterns
Fan-out / fan-in (orchestrator)
Call multiple specialized agents in parallel and synthesize their results:
@hexr_agent(name="orchestrator", tenant="acme-corp", a2a=True)
async def orchestrate(question: str) -> str:
import asyncio
# Fan-out: call multiple agents in parallel
async with A2AClient("http://researcher-a2a.tenant-acme.svc") as researcher, \
A2AClient("http://analyst-a2a.tenant-acme.svc") as analyst:
research_task, analysis_task = await asyncio.gather(
researcher.send(Message(parts=[TextPart(text=question)])),
analyst.send(Message(parts=[TextPart(text=question)]))
)
# Fan-in: combine results
research = research_task.artifacts[0].parts[0].text
analysis = analysis_task.artifacts[0].parts[0].text
client = hexr_llm(openai.OpenAI())
synthesis = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": f"Synthesize:\n{research}\n{analysis}"}]
)
return synthesis.choices[0].message.content
Pipeline (sequential)
Pass results from one agent to the next in a chain:
@hexr_agent(name="pipeline", tenant="acme-corp", a2a=True)
async def pipeline(data: str) -> str:
# Step 1: Process
async with A2AClient("http://processor-a2a.tenant-acme.svc") as proc:
processed = await proc.send(Message(parts=[TextPart(text=data)]))
# Step 2: Review
async with A2AClient("http://reviewer-a2a.tenant-acme.svc") as rev:
reviewed = await rev.send(Message(
parts=[TextPart(text=processed.artifacts[0].parts[0].text)]
))
return reviewed.artifacts[0].parts[0].text
Discovery
Agents are discovered via Kubernetes DNS and Agent Cards. The URL format follows a consistent convention:
# DNS format: {agent-name}-a2a.{namespace}.svc
http://content-crew-a2a.tenant-acme-corp.svc
# Agent Card endpoint (discovered automatically)
GET /.well-known/agent.json → AgentCard JSON
All A2A communication travels over mTLS. Both the sender and receiver must have valid SPIFFE identities issued by the same trust domain.
Task guarantees
Hexr A2A provides the following reliability guarantees for task lifecycle:
- Idempotent submission — duplicate task submissions are safe; the same task ID is returned
- Automatic cleanup — completed tasks expire after 24 hours by default
- Cooperative cancellation — long-running agents should check for cancellation and stop gracefully