Skip to main content
hexr.a2a provides typed, authenticated communication between agents running on Hexr. Each agent-to-agent connection uses mutual TLS via SPIFFE identities on both sides — your agent proves who it is, and the remote agent does the same. The A2AClient handles task lifecycle (submission, polling, cancellation) and returns structured results with typed artifacts. On the receiving side, enabling a2a=True on @hexr_agent starts a bridge that routes incoming messages to your function automatically.

Quick start

Sending a message

from hexr.a2a import A2AClient, Message, TextPart

async with A2AClient("http://content-crew-a2a.tenant-acme.svc") as client:
    # Discover the remote agent
    card = await client.discover()
    print(f"Agent: {card.name}, Skills: {[s.name for s in card.skills]}")
    
    # Send a message
    task = await client.send(Message(
        parts=[TextPart(text="Write a blog post about AI agents")]
    ))
    
    print(f"Task: {task.id}, State: {task.state}")
    # Task: task_abc123, State: completed
    
    for artifact in task.artifacts:
        print(artifact.parts[0].text)

Receiving messages (A2A bridge)

When you set a2a=True on @hexr_agent, the bridge is started automatically. Your function is called when another agent sends a message:
@hexr_agent(
    name="content-crew",
    tenant="acme-corp",
    a2a=True,
    skills=[{"id": "write", "name": "Write", "description": "Write content"}]
)
def handle_request(message: str) -> str:
    return f"Here's your content about: {message}"

A2AClient

from hexr.a2a import A2AClient

client = A2AClient(
    base_url="http://agent-name-a2a.tenant-namespace.svc",
    timeout=300,          # Max wait for task completion
    max_retries=3,        # Retry on transient failures
    card_cache_ttl=300    # Cache agent card for 5 minutes
)

Methods

discover()
async method
Fetch the remote agent’s Agent Card. Cached for card_cache_ttl seconds.
card = await client.discover()
print(card.name)          # "content-crew"
print(card.description)   # "Multi-agent content creation crew"
print(card.skills)        # [Skill(id="write", name="Write", ...)]
print(card.capabilities)  # {streaming: true, ...}
Returns: AgentCard
send(message)
async method
Send a message to the remote agent. Blocks until the task reaches a terminal state (completed, failed, or canceled).
task = await client.send(Message(
    parts=[TextPart(text="Analyze Q4 financials")]
))

if task.state == TaskState.COMPLETED:
    result = task.artifacts[0].parts[0].text
elif task.state == TaskState.FAILED:
    error = task.artifacts[0].parts[0].text
Returns: Task
get_task(task_id)
async method
Poll a task’s current state without blocking.
task = await client.get_task("task_abc123")
print(task.state)  # submitted | working | completed | failed | canceled
cancel_task(task_id)
async method
Request cooperative cancellation. The remote agent should check for cancellation and stop gracefully.
task = await client.cancel_task("task_abc123")

Data models

Message

Messages carry one or more parts, each with a different content type:
from hexr.a2a import Message, TextPart, DataPart, FilePart, FileContent

# Text message
msg = Message(parts=[TextPart(text="Hello")])

# Structured data
msg = Message(parts=[DataPart(data={"query": "analyze", "sector": "tech"})])

# File attachment
msg = Message(parts=[FilePart(file=FileContent(
    name="report.pdf",
    mimeType="application/pdf",
    bytes="base64-encoded-content"
))])

Task states

Tasks follow a linear lifecycle. INPUT_REQUIRED is a special state where the remote agent needs more information:
from hexr.a2a import TaskState

# Lifecycle: submitted → working → completed | failed | canceled
TaskState.SUBMITTED        # Task received, not yet started
TaskState.WORKING          # Agent is processing
TaskState.COMPLETED        # Successfully finished (has artifacts)
TaskState.FAILED           # Error occurred
TaskState.CANCELED         # Cooperative cancellation
TaskState.INPUT_REQUIRED   # Agent needs more info (question in artifacts)

AgentCard

card = await client.discover()

card.name           # Agent name
card.description    # Human-readable description
card.url            # Agent A2A endpoint URL
card.version        # Agent version
card.skills         # List of skills [{id, name, description}]
card.capabilities   # {streaming, pushNotifications, stateTransitionHistory}

Patterns

Fan-out / fan-in (orchestrator)

Call multiple specialized agents in parallel and synthesize their results:
@hexr_agent(name="orchestrator", tenant="acme-corp", a2a=True)
async def orchestrate(question: str) -> str:
    import asyncio
    
    # Fan-out: call multiple agents in parallel
    async with A2AClient("http://researcher-a2a.tenant-acme.svc") as researcher, \
               A2AClient("http://analyst-a2a.tenant-acme.svc") as analyst:
        
        research_task, analysis_task = await asyncio.gather(
            researcher.send(Message(parts=[TextPart(text=question)])),
            analyst.send(Message(parts=[TextPart(text=question)]))
        )
    
    # Fan-in: combine results
    research = research_task.artifacts[0].parts[0].text
    analysis = analysis_task.artifacts[0].parts[0].text
    
    client = hexr_llm(openai.OpenAI())
    synthesis = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Synthesize:\n{research}\n{analysis}"}]
    )
    
    return synthesis.choices[0].message.content

Pipeline (sequential)

Pass results from one agent to the next in a chain:
@hexr_agent(name="pipeline", tenant="acme-corp", a2a=True)
async def pipeline(data: str) -> str:
    # Step 1: Process
    async with A2AClient("http://processor-a2a.tenant-acme.svc") as proc:
        processed = await proc.send(Message(parts=[TextPart(text=data)]))
    
    # Step 2: Review
    async with A2AClient("http://reviewer-a2a.tenant-acme.svc") as rev:
        reviewed = await rev.send(Message(
            parts=[TextPart(text=processed.artifacts[0].parts[0].text)]
        ))
    
    return reviewed.artifacts[0].parts[0].text

Discovery

Agents are discovered via Kubernetes DNS and Agent Cards. The URL format follows a consistent convention:
# DNS format: {agent-name}-a2a.{namespace}.svc
http://content-crew-a2a.tenant-acme-corp.svc

# Agent Card endpoint (discovered automatically)
GET /.well-known/agent.json → AgentCard JSON
All A2A communication travels over mTLS. Both the sender and receiver must have valid SPIFFE identities issued by the same trust domain.

Task guarantees

Hexr A2A provides the following reliability guarantees for task lifecycle:
  • Idempotent submission — duplicate task submissions are safe; the same task ID is returned
  • Automatic cleanup — completed tasks expire after 24 hours by default
  • Cooperative cancellation — long-running agents should check for cancellation and stop gracefully