Reflexion and Evolution Playbook
Implementing Self-Improving Agentic Systems
Document ID: A5-EVOLUTION-PLAYBOOK
Version: 1.0
Category: P1 - Implementation Guides
Audience: ML Engineers, Platform Architects, Research Engineers
Executive Summary
Evolution mechanisms transform static agents into continuously improving systems. This playbook covers five approaches: Reflexion, Continual Learning, Meta-Learning, Workflow Tuning, and Agentic Memory, with implementation patterns for each.
Key Finding: Shinn et al. 2023 demonstrated Reflexion improves task success rates by 20-30% through verbal self-reflection, without requiring model fine-tuning.
Evolution Mechanism Overview
| Mechanism | Learning Signal | Update Target | Latency | Persistence |
|---|---|---|---|---|
| Reflexion | Verbal feedback | Episodic memory | Real-time | Session |
| Continual Learning | Task outcomes | Model weights | Hours-days | Permanent |
| Meta-Learning | Task distribution | Adaptation speed | Minutes | Temporary |
| Workflow Tuning | Performance metrics | Workflow config | Hours | Permanent |
| Agentic Memory | Experience patterns | Memory structure | Real-time | Permanent |
Part 1: Reflexion Implementation
Core Concept
Reflexion enables agents to learn from failures through verbal self-reflection stored in episodic memory, without model fine-tuning.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ EXECUTE │────►│ EVALUATE │────►│ REFLECT │
│ Task │ │ Outcome │ │ on Failure │
└─────────────┘ └──────┬──────┘ └──────┬──────┘
│ │
│ ▼
│ ┌─────────────┐
└───────────►│ UPDATE │
│ Memory │
└─────────────┘
Implementation
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import time
@dataclass
class ReflexionMemory:
"""Memory entry from reflexion."""
task_description: str
attempt_number: int
action_trajectory: List[Dict[str, Any]]
outcome: str
success: bool
reflection: str
lessons_learned: List[str]
timestamp: float = field(default_factory=time.time)
@dataclass
class ReflexionConfig:
max_attempts: int = 5
reflection_temperature: float = 0.7
memory_window: int = 10 # Recent memories to include
success_threshold: float = 0.8
class ReflexionAgent:
"""Agent with Reflexion-based learning."""
def __init__(
self,
llm_client,
tool_executor,
config: ReflexionConfig = None
):
self.llm = llm_client
self.tools = tool_executor
self.config = config or ReflexionConfig()
# Reflexion memory
self.memories: List[ReflexionMemory] = []
# Current episode state
self.current_trajectory: List[Dict[str, Any]] = []
async def execute_with_reflexion(
self,
task: str,
evaluator: callable = None
) -> Dict[str, Any]:
"""Execute task with reflexion loop."""
for attempt in range(self.config.max_attempts):
# Reset trajectory
self.current_trajectory = []
# Get relevant memories
relevant_memories = self._retrieve_relevant_memories(task)
# Execute task
result = await self._execute_attempt(
task,
attempt,
relevant_memories
)
# Evaluate outcome
if evaluator:
evaluation = await evaluator(task, result)
else:
evaluation = await self._self_evaluate(task, result)
if evaluation["success"]:
# Store successful memory
await self._store_memory(
task, attempt, result,
evaluation, success=True
)
return {
"success": True,
"result": result,
"attempts": attempt + 1
}
# Generate reflection on failure
reflection = await self._generate_reflection(
task, result, evaluation
)
# Store failed attempt with reflection
await self._store_memory(
task, attempt, result,
evaluation, success=False,
reflection=reflection
)
# Max attempts reached
return {
"success": False,
"result": result,
"attempts": self.config.max_attempts,
"final_reflection": reflection
}
async def _execute_attempt(
self,
task: str,
attempt: int,
memories: List[ReflexionMemory]
) -> Dict[str, Any]:
"""Execute a single attempt."""
# Build context from memories
memory_context = self._format_memories_for_prompt(memories)
prompt = f"""
Task: {task}
Attempt: {attempt + 1}
{memory_context}
Based on any previous attempts and reflections, execute this task.
Think step by step and use available tools as needed.
"""
# ReAct-style execution loop
messages = [{"role": "user", "content": prompt}]
max_steps = 10
for step in range(max_steps):
response = await self.llm.generate(
messages=messages,
tools=self.tools.get_schemas(),
tool_choice="auto"
)
# Record trajectory
self.current_trajectory.append({
"step": step,
"thought": response.content,
"tool_calls": response.tool_calls
})
if not response.tool_calls:
# No more tool calls, task complete
break
# Execute tool calls
for tool_call in response.tool_calls:
result = await self.tools.execute(
tool_call.name,
tool_call.arguments
)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})
self.current_trajectory[-1]["tool_result"] = result
return {
"final_response": response.content,
"trajectory": self.current_trajectory
}
async def _self_evaluate(
self,
task: str,
result: Dict[str, Any]
) -> Dict[str, Any]:
"""Self-evaluate task completion."""
prompt = f"""
Evaluate whether this task was completed successfully.
Task: {task}
Execution result:
{json.dumps(result, indent=2)}
Provide evaluation as JSON:
{{
"success": true/false,
"score": 0.0-1.0,
"reasoning": "explanation",
"issues": ["list of issues if any"]
}}
"""
response = await self.llm.generate(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
response_format="json"
)
return json.loads(response.content)
async def _generate_reflection(
self,
task: str,
result: Dict[str, Any],
evaluation: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate reflection on failed attempt."""
prompt = f"""
Reflect on why this task attempt failed and what to do differently.
Task: {task}
Execution trajectory:
{json.dumps(self.current_trajectory, indent=2)}
Evaluation:
{json.dumps(evaluation, indent=2)}
Generate a reflection with:
1. What went wrong
2. Why it went wrong
3. Specific lessons for next attempt
4. Alternative approaches to try
Return as JSON:
{{
"what_went_wrong": "description",
"root_cause": "analysis",
"lessons": ["lesson1", "lesson2"],
"alternative_approaches": ["approach1", "approach2"],
"verbal_reflection": "A complete sentence reflection for memory"
}}
"""
response = await self.llm.generate(
messages=[{"role": "user", "content": prompt}],
temperature=self.config.reflection_temperature,
response_format="json"
)
return json.loads(response.content)
async def _store_memory(
self,
task: str,
attempt: int,
result: Dict[str, Any],
evaluation: Dict[str, Any],
success: bool,
reflection: Dict[str, Any] = None
):
"""Store attempt in reflexion memory."""
memory = ReflexionMemory(
task_description=task,
attempt_number=attempt,
action_trajectory=self.current_trajectory,
outcome=evaluation.get("reasoning", ""),
success=success,
reflection=reflection.get("verbal_reflection", "") if reflection else "",
lessons_learned=reflection.get("lessons", []) if reflection else []
)
self.memories.append(memory)
# Prune old memories if needed
if len(self.memories) > 100:
self.memories = self.memories[-100:]
def _retrieve_relevant_memories(
self,
task: str
) -> List[ReflexionMemory]:
"""Retrieve memories relevant to current task."""
# Simple: return recent memories
# Production: use embedding similarity
relevant = []
for memory in reversed(self.memories[-self.config.memory_window:]):
# Include failed attempts with reflections
if not memory.success and memory.reflection:
relevant.append(memory)
# Include successful completions of similar tasks
elif memory.success:
relevant.append(memory)
return relevant[:5] # Limit to 5 most relevant
def _format_memories_for_prompt(
self,
memories: List[ReflexionMemory]
) -> str:
"""Format memories for inclusion in prompt."""
if not memories:
return ""
sections = ["Previous attempts and learnings:"]
for mem in memories:
if mem.success:
sections.append(f"""
[SUCCESS] Task: {mem.task_description[:100]}
Approach that worked: {mem.outcome[:200]}
""")
else:
sections.append(f"""
[FAILED ATTEMPT] Task: {mem.task_description[:100]}
Reflection: {mem.reflection}
Lessons: {', '.join(mem.lessons_learned[:3])}
""")
return "\n".join(sections)
Part 2: Continual Learning
Core Concept
Continual learning updates model parameters based on accumulated experience while avoiding catastrophic forgetting.
from dataclasses import dataclass
from typing import List, Dict, Tuple
import numpy as np
@dataclass
class ExperienceBuffer:
"""Buffer for storing training experiences."""
max_size: int = 10000
experiences: List[Dict] = field(default_factory=list)
def add(self, experience: Dict):
self.experiences.append(experience)
if len(self.experiences) > self.max_size:
self.experiences.pop(0)
def sample(self, batch_size: int) -> List[Dict]:
indices = np.random.choice(
len(self.experiences),
min(batch_size, len(self.experiences)),
replace=False
)
return [self.experiences[i] for i in indices]
class ContinualLearningManager:
"""Manage continual learning for agents."""
def __init__(
self,
base_model: str,
fine_tuning_api,
experience_buffer: ExperienceBuffer = None
):
self.base_model = base_model
self.fine_tuning_api = fine_tuning_api
self.buffer = experience_buffer or ExperienceBuffer()
# Track model versions
self.current_model = base_model
self.model_history: List[str] = [base_model]
# Learning configuration
self.update_threshold = 100 # Experiences before update
self.replay_ratio = 0.3 # Ratio of old experiences in training
def record_experience(
self,
task: str,
trajectory: List[Dict],
outcome: str,
reward: float
):
"""Record experience for future learning."""
experience = {
"task": task,
"trajectory": trajectory,
"outcome": outcome,
"reward": reward,
"timestamp": time.time()
}
self.buffer.add(experience)
# Check if update threshold reached
if len(self.buffer.experiences) % self.update_threshold == 0:
asyncio.create_task(self._trigger_update())
async def _trigger_update(self):
"""Trigger model update with accumulated experiences."""
# Sample experiences
recent = self.buffer.experiences[-self.update_threshold:]
replay = self.buffer.sample(
int(self.update_threshold * self.replay_ratio)
)
training_data = recent + replay
# Convert to training format
training_examples = self._convert_to_training_format(training_data)
# Fine-tune model
new_model = await self.fine_tuning_api.create_fine_tune(
base_model=self.current_model,
training_data=training_examples,
hyperparameters={
"n_epochs": 1,
"learning_rate_multiplier": 0.5
}
)
# Update current model
self.current_model = new_model
self.model_history.append(new_model)
def _convert_to_training_format(
self,
experiences: List[Dict]
) -> List[Dict]:
"""Convert experiences to fine-tuning format."""
examples = []
for exp in experiences:
if exp["reward"] > 0.5: # Only learn from positive experiences
# Extract successful trajectory as demonstration
messages = [
{"role": "user", "content": exp["task"]}
]
for step in exp["trajectory"]:
if step.get("thought"):
messages.append({
"role": "assistant",
"content": step["thought"]
})
examples.append({"messages": messages})
return examples
def rollback_model(self, steps: int = 1):
"""Rollback to previous model version."""
if len(self.model_history) > steps:
self.current_model = self.model_history[-(steps + 1)]
self.model_history = self.model_history[:-(steps)]
Part 3: Meta-Learning (MAML-style)
Core Concept
Meta-learning enables rapid adaptation to new tasks by learning good initialization points.
class MetaLearningAgent:
"""Agent with meta-learning capabilities."""
def __init__(
self,
llm_client,
embedding_service,
strategy_store
):
self.llm = llm_client
self.embeddings = embedding_service
self.strategies = strategy_store
# Meta-learned components
self.task_embeddings: Dict[str, List[float]] = {}
self.strategy_effectiveness: Dict[str, Dict[str, float]] = {}
async def adapt_to_task(
self,
task: str,
few_shot_examples: List[Dict] = None
) -> Dict[str, Any]:
"""Rapidly adapt to new task type."""
# Embed task
task_embedding = await self.embeddings.embed(task)
# Find similar tasks we've seen before
similar_tasks = self._find_similar_tasks(task_embedding)
# Get strategies that worked for similar tasks
effective_strategies = self._get_effective_strategies(similar_tasks)
# Adapt strategy based on few-shot examples
if few_shot_examples:
adapted_strategy = await self._adapt_from_examples(
task,
effective_strategies,
few_shot_examples
)
else:
adapted_strategy = effective_strategies[0] if effective_strategies else None
return {
"task": task,
"adapted_strategy": adapted_strategy,
"similar_tasks": similar_tasks,
"confidence": self._calculate_adaptation_confidence(
similar_tasks, effective_strategies
)
}
def _find_similar_tasks(
self,
task_embedding: List[float],
top_k: int = 5
) -> List[Tuple[str, float]]:
"""Find tasks similar to the current one."""
similarities = []
for task_id, embedding in self.task_embeddings.items():
similarity = np.dot(task_embedding, embedding) / (
np.linalg.norm(task_embedding) * np.linalg.norm(embedding)
)
similarities.append((task_id, similarity))
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
def _get_effective_strategies(
self,
similar_tasks: List[Tuple[str, float]]
) -> List[Dict]:
"""Get strategies that worked for similar tasks."""
strategy_scores = {}
for task_id, similarity in similar_tasks:
task_strategies = self.strategy_effectiveness.get(task_id, {})
for strategy_id, effectiveness in task_strategies.items():
weighted_score = similarity * effectiveness
if strategy_id not in strategy_scores:
strategy_scores[strategy_id] = 0
strategy_scores[strategy_id] += weighted_score
# Sort by effectiveness
sorted_strategies = sorted(
strategy_scores.items(),
key=lambda x: x[1],
reverse=True
)
return [
self.strategies.get(s_id)
for s_id, _ in sorted_strategies[:3]
]
async def _adapt_from_examples(
self,
task: str,
base_strategies: List[Dict],
examples: List[Dict]
) -> Dict:
"""Adapt strategy based on few-shot examples."""
prompt = f"""
Given these base strategies that worked for similar tasks:
{json.dumps(base_strategies, indent=2)}
And these examples of the current task:
{json.dumps(examples, indent=2)}
Adapt the most promising strategy to handle the current task:
{task}
Return the adapted strategy as JSON with:
- name: strategy name
- steps: list of steps
- tools_to_use: list of tool names
- success_criteria: how to know if successful
"""
response = await self.llm.generate(
messages=[{"role": "user", "content": prompt}],
temperature=0.5,
response_format="json"
)
return json.loads(response.content)
def record_task_outcome(
self,
task: str,
task_embedding: List[float],
strategy_id: str,
success: bool,
effectiveness: float
):
"""Record outcome for meta-learning."""
task_id = hashlib.md5(task.encode()).hexdigest()[:8]
# Store task embedding
self.task_embeddings[task_id] = task_embedding
# Update strategy effectiveness
if task_id not in self.strategy_effectiveness:
self.strategy_effectiveness[task_id] = {}
# Exponential moving average
alpha = 0.3
current = self.strategy_effectiveness[task_id].get(strategy_id, 0.5)
self.strategy_effectiveness[task_id][strategy_id] = (
alpha * effectiveness + (1 - alpha) * current
)
Part 4: Workflow Tuning
Core Concept
Optimize workflow configurations based on performance metrics through systematic experimentation.
@dataclass
class WorkflowVariant:
"""A variant of a workflow configuration."""
id: str
config: Dict[str, Any]
metrics: Dict[str, float] = field(default_factory=dict)
sample_count: int = 0
class WorkflowTuner:
"""Optimize workflow configurations through experimentation."""
def __init__(
self,
base_workflow: Dict[str, Any],
metric_collector,
experiment_budget: int = 100
):
self.base_workflow = base_workflow
self.metrics = metric_collector
self.budget = experiment_budget
# Variants under test
self.variants: Dict[str, WorkflowVariant] = {}
self.current_best: str = None
# Tunable parameters
self.tunable_params = self._identify_tunable_params()
def _identify_tunable_params(self) -> List[Dict]:
"""Identify parameters that can be tuned."""
return [
{
"name": "temperature",
"type": "float",
"range": [0.1, 1.0],
"step": 0.1
},
{
"name": "max_iterations",
"type": "int",
"range": [3, 10],
"step": 1
},
{
"name": "retrieval_top_k",
"type": "int",
"range": [3, 15],
"step": 2
},
{
"name": "confidence_threshold",
"type": "float",
"range": [0.5, 0.95],
"step": 0.05
}
]
def generate_variants(self, num_variants: int = 5) -> List[WorkflowVariant]:
"""Generate workflow variants for testing."""
variants = []
for i in range(num_variants):
config = self.base_workflow.copy()
# Randomly modify tunable parameters
for param in self.tunable_params:
if np.random.random() > 0.5:
if param["type"] == "float":
value = np.random.uniform(
param["range"][0],
param["range"][1]
)
else:
value = np.random.randint(
param["range"][0],
param["range"][1] + 1
)
config[param["name"]] = value
variant = WorkflowVariant(
id=f"variant_{i}_{int(time.time())}",
config=config
)
variants.append(variant)
self.variants[variant.id] = variant
return variants
async def run_experiment(
self,
variant_id: str,
task: str,
executor: callable
) -> Dict[str, float]:
"""Run experiment with a specific variant."""
variant = self.variants[variant_id]
# Execute task with variant config
start_time = time.time()
result = await executor(task, variant.config)
duration = time.time() - start_time
# Collect metrics
metrics = {
"success": 1.0 if result.get("success") else 0.0,
"duration": duration,
"token_usage": result.get("token_usage", 0),
"tool_calls": result.get("tool_calls", 0)
}
# Update variant metrics (running average)
variant.sample_count += 1
for key, value in metrics.items():
if key not in variant.metrics:
variant.metrics[key] = value
else:
# Running average
n = variant.sample_count
variant.metrics[key] = (
variant.metrics[key] * (n - 1) + value
) / n
return metrics
def select_best_variant(
self,
optimization_target: str = "success",
constraints: Dict[str, float] = None
) -> WorkflowVariant:
"""Select best variant based on metrics."""
valid_variants = []
for variant in self.variants.values():
# Skip variants with insufficient samples
if variant.sample_count < 5:
continue
# Check constraints
if constraints:
meets_constraints = all(
variant.metrics.get(key, float('inf')) <= value
for key, value in constraints.items()
)
if not meets_constraints:
continue
valid_variants.append(variant)
if not valid_variants:
return None
# Sort by optimization target
valid_variants.sort(
key=lambda v: v.metrics.get(optimization_target, 0),
reverse=True
)
self.current_best = valid_variants[0].id
return valid_variants[0]
def get_improvement_report(self) -> Dict[str, Any]:
"""Generate report on improvements found."""
if not self.current_best:
return {"status": "no_best_found"}
best = self.variants[self.current_best]
# Calculate improvement over base
base_metrics = self.variants.get("base", self.variants[list(self.variants.keys())[0]])
improvements = {}
for key in best.metrics:
if key in base_metrics.metrics:
change = (
(best.metrics[key] - base_metrics.metrics[key]) /
max(base_metrics.metrics[key], 0.001)
) * 100
improvements[key] = f"{change:+.1f}%"
return {
"best_variant": self.current_best,
"best_config": best.config,
"best_metrics": best.metrics,
"improvements": improvements,
"sample_count": best.sample_count
}
Part 5: Agentic Memory (A-Mem)
Core Concept
Self-organizing memory that evolves its structure based on usage patterns and agent needs.
@dataclass
class MemoryNode:
"""A node in the agentic memory graph."""
id: str
content: str
node_type: str # "episodic", "semantic", "procedural"
embedding: List[float]
importance: float
access_count: int = 0
last_accessed: float = field(default_factory=time.time)
connections: Dict[str, float] = field(default_factory=dict) # node_id -> weight
class AgenticMemory:
"""Self-organizing memory system."""
def __init__(
self,
embedding_service,
llm_client,
decay_rate: float = 0.01
):
self.embeddings = embedding_service
self.llm = llm_client
self.decay_rate = decay_rate
# Memory graph
self.nodes: Dict[str, MemoryNode] = {}
# Index for retrieval
self.embedding_index = {}
async def store(
self,
content: str,
node_type: str,
importance: float = 0.5,
related_to: List[str] = None
) -> str:
"""Store new memory and integrate with existing structure."""
# Generate embedding
embedding = await self.embeddings.embed(content)
# Create node
node_id = str(uuid.uuid4())[:8]
node = MemoryNode(
id=node_id,
content=content,
node_type=node_type,
embedding=embedding,
importance=importance
)
# Find and connect to related nodes
similar_nodes = await self._find_similar_nodes(embedding, top_k=5)
for similar_id, similarity in similar_nodes:
if similarity > 0.5:
node.connections[similar_id] = similarity
self.nodes[similar_id].connections[node_id] = similarity
# Connect to explicitly related nodes
if related_to:
for related_id in related_to:
if related_id in self.nodes:
node.connections[related_id] = 0.8
self.nodes[related_id].connections[node_id] = 0.8
# Store node
self.nodes[node_id] = node
# Trigger memory consolidation
await self._consolidate_memories()
return node_id
async def retrieve(
self,
query: str,
top_k: int = 5,
node_types: List[str] = None
) -> List[MemoryNode]:
"""Retrieve relevant memories with spreading activation."""
query_embedding = await self.embeddings.embed(query)
# Initial retrieval by embedding similarity
candidates = await self._find_similar_nodes(query_embedding, top_k=top_k * 2)
# Spreading activation
activated = {}
for node_id, similarity in candidates:
# Base activation from similarity
activated[node_id] = similarity * self.nodes[node_id].importance
# Spread to connected nodes
for connected_id, weight in self.nodes[node_id].connections.items():
spread_activation = activated[node_id] * weight * 0.5
if connected_id not in activated:
activated[connected_id] = spread_activation
else:
activated[connected_id] += spread_activation
# Filter by node type if specified
if node_types:
activated = {
k: v for k, v in activated.items()
if self.nodes[k].node_type in node_types
}
# Sort by activation and return
sorted_nodes = sorted(
activated.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
# Update access statistics
results = []
for node_id, _ in sorted_nodes:
node = self.nodes[node_id]
node.access_count += 1
node.last_accessed = time.time()
results.append(node)
return results
async def _consolidate_memories(self):
"""Periodically consolidate and reorganize memories."""
# Apply time decay to importance
current_time = time.time()
for node in self.nodes.values():
time_since_access = current_time - node.last_accessed
decay_factor = np.exp(-self.decay_rate * time_since_access / 86400) # Daily decay
node.importance *= decay_factor
# Merge highly similar nodes
await self._merge_similar_nodes()
# Strengthen frequently co-accessed connections
await self._strengthen_coactivation()
# Prune weak connections
self._prune_weak_connections()
async def _merge_similar_nodes(self, threshold: float = 0.95):
"""Merge nearly identical memory nodes."""
to_merge = []
checked = set()
for node_id, node in self.nodes.items():
if node_id in checked:
continue
for other_id, other in self.nodes.items():
if other_id == node_id or other_id in checked:
continue
similarity = np.dot(node.embedding, other.embedding) / (
np.linalg.norm(node.embedding) * np.linalg.norm(other.embedding)
)
if similarity > threshold:
to_merge.append((node_id, other_id))
checked.add(other_id)
# Perform merges
for keep_id, merge_id in to_merge:
await self._merge_nodes(keep_id, merge_id)
async def _merge_nodes(self, keep_id: str, merge_id: str):
"""Merge two nodes, keeping one."""
keep_node = self.nodes[keep_id]
merge_node = self.nodes[merge_id]
# Combine importance
keep_node.importance = max(keep_node.importance, merge_node.importance)
# Combine connections
for conn_id, weight in merge_node.connections.items():
if conn_id != keep_id:
if conn_id in keep_node.connections:
keep_node.connections[conn_id] = max(
keep_node.connections[conn_id], weight
)
else:
keep_node.connections[conn_id] = weight
# Update references in other nodes
for node in self.nodes.values():
if merge_id in node.connections:
weight = node.connections.pop(merge_id)
if keep_id not in node.connections:
node.connections[keep_id] = weight
# Remove merged node
del self.nodes[merge_id]
def _prune_weak_connections(self, threshold: float = 0.1):
"""Remove weak connections."""
for node in self.nodes.values():
node.connections = {
k: v for k, v in node.connections.items()
if v >= threshold
}
async def evolve_structure(self):
"""Use LLM to suggest structural improvements."""
# Sample memory structure
sample_nodes = list(self.nodes.values())[:20]
prompt = f"""
Analyze this memory structure and suggest improvements:
Nodes: {json.dumps([{
'id': n.id,
'type': n.node_type,
'content': n.content[:100],
'importance': n.importance,
'connections': len(n.connections)
} for n in sample_nodes], indent=2)}
Suggest:
1. Nodes that should be merged
2. Missing connections that should exist
3. Nodes that should be split into multiple concepts
4. Importance adjustments
Return as JSON with suggested_merges, suggested_connections,
suggested_splits, importance_adjustments.
"""
response = await self.llm.generate(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
response_format="json"
)
suggestions = json.loads(response.content)
# Apply suggestions
await self._apply_evolution_suggestions(suggestions)
return suggestions
Part 6: Integration Patterns
Combining Evolution Mechanisms
class EvolvingAgent:
"""Agent that combines multiple evolution mechanisms."""
def __init__(self, llm_client, tool_registry):
self.llm = llm_client
self.tools = tool_registry
# Evolution components
self.reflexion = ReflexionAgent(llm_client, tool_registry)
self.meta_learner = MetaLearningAgent(llm_client, None, None)
self.memory = AgenticMemory(None, llm_client)
self.workflow_tuner = WorkflowTuner({}, None)
async def execute_and_evolve(
self,
task: str,
allow_evolution: bool = True
) -> Dict[str, Any]:
"""Execute task with full evolution stack."""
# 1. Meta-learning: Adapt strategy
adaptation = await self.meta_learner.adapt_to_task(task)
# 2. Memory: Retrieve relevant context
memories = await self.memory.retrieve(task)
# 3. Reflexion: Execute with self-improvement
result = await self.reflexion.execute_with_reflexion(
task,
evaluator=self._evaluate_with_context
)
if allow_evolution:
# 4. Store in agentic memory
await self.memory.store(
content=f"Task: {task}\nOutcome: {result['result']}",
node_type="episodic",
importance=0.8 if result["success"] else 0.3
)
# 5. Update meta-learning
self.meta_learner.record_task_outcome(
task=task,
task_embedding=await self.memory.embeddings.embed(task),
strategy_id=adaptation.get("adapted_strategy", {}).get("name", "default"),
success=result["success"],
effectiveness=1.0 if result["success"] else 0.0
)
# 6. Workflow tuning (periodically)
if np.random.random() < 0.1:
await self.workflow_tuner.run_experiment(
"current",
task,
self._execute_for_tuning
)
return result
Quick Reference
Evolution Mechanism Selection
| Scenario | Recommended | Why |
|---|---|---|
| Task failures | Reflexion | Immediate verbal feedback |
| Accumulated data | Continual Learning | Model improvement |
| New task types | Meta-Learning | Fast adaptation |
| Performance optimization | Workflow Tuning | Systematic experimentation |
| Knowledge accumulation | Agentic Memory | Self-organizing storage |
Evolution Timing
| Mechanism | Trigger | Frequency | Resource Cost |
|---|---|---|---|
| Reflexion | Every failure | Real-time | Low |
| Continual Learning | Experience threshold | Hours/days | High |
| Meta-Learning | New task type | On-demand | Medium |
| Workflow Tuning | Performance degradation | Weekly | Medium |
| Agentic Memory | Every interaction | Continuous | Low |
Document maintained by CODITECT ML Team. Feedback: ml@coditect.com