Skip to content

SyGra Complete Usage Guide

Comprehensive documentation with examples


Table of Contents

  1. Quick Start
  2. Basic Workflows
  3. Data Sources and Sinks
  4. LLM Nodes
  5. Agent Nodes
  6. Multi-LLM Nodes
  7. Lambda Nodes
  8. Subgraphs
  9. Advanced Features
  10. Complete Examples

Quick Start

Installation

pip install sygra
# or with poetry
poetry add sygra

Your First Workflow

import sygra

# Create a simple workflow
workflow = sygra.Workflow("my_first_workflow")

# Add a data source
workflow.source([
    {"id": 1, "text": "What is Python?"},
    {"id": 2, "text": "Explain machine learning"},
])

# Add an LLM node
workflow.llm(
    model="gpt-4o",
    prompt="Answer this question: {text}"
)

# Add output sink
workflow.sink("output/results.jsonl")

# Run the workflow
results = workflow.run(num_records=2)
print(f"Generated {len(results)} responses")

Output:

Generated 2 responses


Basic Workflows

Example 1: Simple Text Generation

import sygra

# Create workflow
workflow = sygra.Workflow("text_generation")

# Define input data
data = [
    {"topic": "artificial intelligence"},
    {"topic": "quantum computing"},
    {"topic": "blockchain technology"},
]

# Build the workflow
result = (
    workflow
    .source(data)
    .llm(
        model="gpt-4o",
        prompt="Write a brief introduction about: {topic}"
    )
    .sink("output/introductions.jsonl")
    .run()
)

print(f"✓ Generated {len(result)} introductions")

Example 2: Multi-Step Processing

import sygra

# Create workflow with multiple steps
workflow = sygra.Workflow("multi_step_processing")

# Step 1: Generate initial content
workflow.source([
    {"subject": "climate change"},
    {"subject": "renewable energy"},
])

# Step 2: Generate a summary
workflow.llm(
    model="gpt-4o",
    prompt=[
        {"system": "You are a science writer."},
        {"user": "Write a 2-paragraph article about: {subject}"}
    ]
)

# Step 3: Extract key points
workflow.llm(
    model="gpt-4o-mini",
    prompt="Extract 3 key points from this article: {llm_1_response}"
)

# Save results
workflow.sink("output/articles_with_keypoints.jsonl")

# Execute
results = workflow.run(num_records=2)

Data Sources and Sinks

Memory Data Source

import sygra

# In-memory data
workflow = sygra.Workflow("memory_source")

workflow.source([
    {"id": 1, "name": "Alice", "question": "What is AI?"},
    {"id": 2, "name": "Bob", "question": "How does ML work?"},
])

workflow.llm("gpt-4o", "Answer {name}'s question: {question}")
workflow.sink("output/answers.jsonl")

workflow.run()

File Data Source

import sygra

# Load from JSONL file
workflow = sygra.Workflow("file_source")

workflow.source("data/questions.jsonl")  # Each line is a JSON object
workflow.llm("gpt-4o", "Provide a detailed answer to: {question}")
workflow.sink("output/detailed_answers.jsonl")

workflow.run(num_records=10)  # Process only first 10 records

HuggingFace Data Source

import sygra

# Load from HuggingFace dataset
workflow = sygra.Workflow("hf_source")

workflow.source({
    "type": "hf",
    "dataset": "squad",
    "split": "train",
    "config": "plain_text"
})

workflow.llm(
    model="gpt-4o",
    prompt="Rephrase this question: {question}"
)

workflow.sink("output/rephrased_questions.jsonl")

workflow.run(num_records=100)

Multiple Output Formats

import sygra

workflow = sygra.Workflow("multiple_outputs")

workflow.source("data/input.jsonl")
workflow.llm("gpt-4o", "Summarize: {text}")

# Save to multiple formats
workflow.sink("output/results.jsonl")  # JSONL format
workflow.output_format("json")  # Also save as JSON

workflow.run()

LLM Nodes

Basic LLM Usage

import sygra

workflow = sygra.Workflow("basic_llm")

workflow.source([{"text": "Explain photosynthesis"}])

# Simple prompt
workflow.llm(
    model="gpt-4o",
    prompt="Provide a clear explanation: {text}"
)

workflow.sink("output/explanations.jsonl")
workflow.run()

Multi-Message Prompts

import sygra

workflow = sygra.Workflow("multi_message_llm")

workflow.source([
    {"topic": "Python decorators", "level": "intermediate"}
])

# Multiple messages with system, user, and assistant roles
workflow.llm(
    model="gpt-4o",
    prompt=[
        {
            "system": "You are an expert programming tutor. "
                     "Adapt explanations to the user's skill level."
        },
        {
            "user": "Explain {topic} for a {level} programmer."
        }
    ]
)

workflow.sink("output/tutorials.jsonl")
workflow.run()

LLM with Different Models

import sygra

workflow = sygra.Workflow("model_comparison")

data = [{"question": "What is recursion?"}]

workflow.source(data)

# Using GPT-4o
workflow.llm(
    model="gpt-4o",
    prompt="Answer this question: {question}"
)

workflow.sink("output/gpt4_answers.jsonl")
workflow.run()

# Create another workflow with Claude
workflow2 = sygra.Workflow("claude_workflow")
workflow2.source(data)

workflow2.llm(
    model="claude-sonnet-4",
    prompt="Answer this question: {question}"
)

workflow2.sink("output/claude_answers.jsonl")
workflow2.run()

LLM with Temperature and Parameters

import sygra

workflow = sygra.Workflow("creative_writing")

workflow.source([{"theme": "space exploration"}])

# Use model configuration for creative output
workflow.llm(
    model={
        "name": "gpt-4o",
        "temperature": 0.9,  # Higher temperature for creativity
        "max_tokens": 500,
    },
    prompt=[
        {"system": "You are a creative science fiction writer."},
        {"user": "Write a short story opening about: {theme}"}
    ]
)

workflow.sink("output/story_openings.jsonl")
workflow.run()

Structured Output with LLM

import sygra

workflow = sygra.Workflow("structured_output")

workflow.source([
    {"product": "smartphone"},
    {"product": "laptop"},
])

# Request structured JSON output
workflow.llm(
    model="gpt-4o",
    prompt="Generate product review for: {product}",
    structured_output={
        "enabled": True,
        "schema": {
            "name": "ProductReview",
            "fields": {
                "rating": {"type": "integer"},
                "pros": {"type": "array"},
                "cons": {"type": "array"},
                "summary": {"type": "string"}
            }
        }
    }
)

workflow.sink("output/structured_reviews.jsonl")
workflow.run()

Agent Nodes

Basic Agent with Tools

import sygra

workflow = sygra.Workflow("agent_workflow")

workflow.source([
    {"task": "Calculate the square root of 144 and explain the result"}
])

# Agent with calculator tool
workflow.agent(
    model="gpt-4o",
    tools=["calculator"],  # Built-in calculator tool
    prompt="Complete this task: {task}"
)

workflow.sink("output/agent_results.jsonl")
workflow.run()

Agent with Multiple Tools

import sygra

workflow = sygra.Workflow("multi_tool_agent")

workflow.source([
    {"query": "Search for recent AI news and summarize the top 3 stories"}
])

# Agent with multiple tools
workflow.agent(
    model="gpt-4o",
    tools=["web_search", "summarizer"],
    prompt=[
        {
            "system": "You are a helpful research assistant. "
                     "Use tools when needed to provide accurate information."
        },
        {
            "user": "{query}"
        }
    ]
)

workflow.sink("output/research_results.jsonl")
workflow.run()

Agent with Chat History

import sygra

workflow = sygra.Workflow("conversational_agent")

workflow.source([
    {
        "conversation": [
            {"role": "user", "content": "What is machine learning?"},
            {"role": "assistant", "content": "Machine learning is..."},
            {"role": "user", "content": "Can you give me an example?"}
        ]
    }
])

# Agent that maintains conversation context
workflow.agent(
    model="gpt-4o",
    tools=[],  # No tools needed
    prompt="Continue this conversation naturally",
    chat_history=True  # Enable chat history
)

workflow.sink("output/conversations.jsonl")
workflow.run()

Multi-LLM Nodes

Compare Multiple Models

import sygra

workflow = sygra.Workflow("model_comparison")

workflow.source([
    {"question": "What are the benefits of renewable energy?"}
])

# Compare responses from multiple models
workflow.multi_llm(
    models={
        "gpt4": "gpt-4o",
        "gpt4_mini": "gpt-4o-mini",
        "claude": "claude-sonnet-4"
    },
    prompt="Answer this question concisely: {question}"
)

workflow.sink("output/model_comparisons.jsonl")
workflow.run()

Multi-LLM with Custom Processing

import sygra

workflow = sygra.Workflow("multi_llm_custom")

workflow.source([{"topic": "quantum computing"}])

workflow.multi_llm(
    models={
        "technical": {
            "name": "gpt-4o",
            "temperature": 0.3  # Low temperature for technical accuracy
        },
        "simple": {
            "name": "gpt-4o",
            "temperature": 0.7  # Higher for more accessible language
        }
    },
    prompt=[
        {"user": "Explain {topic}"}
    ]
)

workflow.sink("output/multi_explanations.jsonl")
workflow.run()

Lambda Nodes

Custom Processing Functions

import sygra

# Define a custom processing function
def clean_text(state):
    """Clean and normalize text"""
    text = state.get("text", "")
    cleaned = text.strip().lower()
    return {"cleaned_text": cleaned}

workflow = sygra.Workflow("lambda_processing")

workflow.source([
    {"text": "  HELLO WORLD  "},
    {"text": "  Python Programming  "}
])

# Use lambda node with custom function
workflow.lambda_func(
    func=clean_text,
    output="cleaned"
)

workflow.sink("output/cleaned.jsonl")
workflow.run()

Lambda with Class-Based Processors

import sygra

# Define a processor class
class TextAnalyzer:
    def __call__(self, state):
        text = state.get("text", "")
        return {
            "word_count": len(text.split()),
            "char_count": len(text),
            "has_question": "?" in text
        }

workflow = sygra.Workflow("text_analysis")

workflow.source([
    {"text": "What is artificial intelligence?"},
    {"text": "Machine learning is a subset of AI"}
])

# Use class-based processor
workflow.lambda_func(
    func=TextAnalyzer,
    output="analysis"
)

workflow.sink("output/text_analysis.jsonl")
workflow.run()

Chaining Lambda and LLM

import sygra

def extract_keywords(state):
    """Extract keywords from text"""
    text = state.get("text", "")
    words = text.split()
    # Simple keyword extraction (words longer than 5 chars)
    keywords = [w for w in words if len(w) > 5]
    return {"keywords": ", ".join(keywords[:5])}

workflow = sygra.Workflow("keyword_expansion")

workflow.source([
    {"text": "Artificial intelligence transforms modern technology"}
])

# Step 1: Extract keywords with lambda
workflow.lambda_func(
    func=extract_keywords,
    output="keywords"
)

# Step 2: Expand on keywords with LLM
workflow.llm(
    model="gpt-4o",
    prompt="Provide detailed explanations for these keywords: {keywords}"
)

workflow.sink("output/expanded_keywords.jsonl")
workflow.run()

Subgraphs

Creating Reusable Subgraphs

First, create a subgraph configuration file: tasks/subgraphs/summarize_and_tag.yaml

graph_config:
  nodes:
    summarizer:
      node_type: llm
      model:
        name: gpt-4o-mini
      prompt:
        - user: "Summarize this text in 2 sentences: {text}"

    tagger:
      node_type: llm
      model:
        name: gpt-4o-mini
      prompt:
        - user: "Generate 3 relevant tags for this summary: {summarizer_response}"

  edges:
    - from: START
      to: summarizer
    - from: summarizer
      to: tagger
    - from: tagger
      to: END

Now use the subgraph in your workflow:

import sygra

workflow = sygra.Workflow("using_subgraph")

workflow.source([
    {"text": "Long article about climate change and its effects..."}
])

# Use the subgraph
workflow.subgraph("tasks/subgraphs/summarize_and_tag")

workflow.sink("output/summarized_tagged.jsonl")
workflow.run()

Subgraph with Configuration Overrides

import sygra

workflow = sygra.Workflow("subgraph_override")

workflow.source([{"text": "Technical document about neural networks..."}])

# Use subgraph with custom model for specific node
workflow.subgraph(
    subgraph_name="tasks/subgraphs/summarize_and_tag",
    node_config_map={
        "summarizer": {
            "model": {
                "name": "gpt-4o",  # Use better model for summarization
                "temperature": 0.3
            }
        }
    }
)

workflow.sink("output/tech_summaries.jsonl")
workflow.run()

Advanced Features

Resumable Execution

import sygra

workflow = sygra.Workflow("resumable_task")

workflow.source("data/large_dataset.jsonl")

# Enable resumable execution
workflow.resumable(True)

workflow.llm("gpt-4o", "Process: {text}")
workflow.sink("output/processed.jsonl")

# Run with resume capability
# If interrupted, run again with --resume flag
workflow.run(
    num_records=10000,
    checkpoint_interval=100,  # Save checkpoint every 100 records
    resume=False  # Set to True to resume from last checkpoint
)

Quality Tagging

import sygra

workflow = sygra.Workflow("quality_tagged")

workflow.source([
    {"text": "Explain photosynthesis"}
])

workflow.llm("gpt-4o", "Provide explanation: {text}")

# Enable quality tagging
workflow.quality_tagging(
    enabled=True,
    config={
        "metrics": ["coherence", "relevance", "factuality"],
        "threshold": 0.7
    }
)

workflow.sink("output/quality_tagged.jsonl")
workflow.run()

OASST Format Mapping

import sygra

workflow = sygra.Workflow("oasst_format")

workflow.source([
    {"question": "What is Python?", "answer": "Python is a programming language"}
])

workflow.llm("gpt-4o", "Improve this answer: {answer}")

# Map to OASST conversation format
workflow.oasst_mapping(
    enabled=True,
    config={
        "required": "yes",
        "intermediate_writing": "no"
    }
)

workflow.sink("output/oasst_conversations.jsonl")
workflow.run()

Output Field Mapping

import sygra

workflow = sygra.Workflow("field_mapping")

workflow.source([
    {"input_text": "Describe AI", "category": "technology"}
])

workflow.llm("gpt-4o", "Describe: {input_text}")

# Map output fields
workflow.output_fields(["input_text", "category", "llm_1_response"])

# Add custom output fields
workflow.output_field(
    name="generated_at",
    value=lambda: datetime.now().isoformat()
)

workflow.output_field(
    name="model_used",
    value="gpt-4o"
)

workflow.sink("output/mapped_output.jsonl")
workflow.run()

Batch Processing Configuration

import sygra

workflow = sygra.Workflow("batch_processing")

workflow.source("data/large_file.jsonl")

workflow.llm("gpt-4o", "Process: {text}")

workflow.sink("output/batched_results.jsonl")

# Run with custom batch size
workflow.run(
    num_records=1000,
    batch_size=50,  # Process 50 records at a time
    output_with_ts=True  # Add timestamp to output filename
)

Complete Examples

Example 1: Educational Q&A Generation

import sygra

def create_educational_qa():
    """Generate educational Q&A pairs"""

    workflow = sygra.Workflow("educational_qa")

    # Input: Topics to generate Q&A about
    topics = [
        {"topic": "photosynthesis", "level": "high school"},
        {"topic": "mitosis", "level": "high school"},
        {"topic": "genetics", "level": "undergraduate"},
    ]

    workflow.source(topics)

    # Step 1: Generate question
    workflow.llm(
        model="gpt-4o",
        prompt=[
            {
                "system": "You are an educational content creator. "
                         "Generate clear, educational questions."
            },
            {
                "user": "Generate a {level} level question about {topic}"
            }
        ]
    )

    # Step 2: Generate detailed answer
    workflow.llm(
        model="gpt-4o",
        prompt=[
            {
                "system": "You are a knowledgeable tutor. "
                         "Provide clear, accurate answers with examples."
            },
            {
                "user": "Answer this question: {llm_1_response}"
            }
        ]
    )

    # Step 3: Generate follow-up questions
    workflow.llm(
        model="gpt-4o-mini",
        prompt="Based on this Q&A, generate 2 follow-up questions:\n"
               "Q: {llm_1_response}\nA: {llm_2_response}"
    )

    # Save with quality tagging
    workflow.quality_tagging(True)
    workflow.sink("output/educational_qa.jsonl")

    # Run
    results = workflow.run()
    print(f"✓ Generated {len(results)} Q&A pairs")
    return results

if __name__ == "__main__":
    create_educational_qa()

Example 2: Code Generation and Explanation

import sygra

def create_code_examples():
    """Generate code examples with explanations"""

    workflow = sygra.Workflow("code_generation")

    # Input: Programming concepts
    concepts = [
        {"concept": "list comprehension", "language": "Python"},
        {"concept": "decorators", "language": "Python"},
        {"concept": "async/await", "language": "JavaScript"},
    ]

    workflow.source(concepts)

    # Step 1: Generate code example
    workflow.llm(
        model="gpt-4o",
        prompt=[
            {
                "system": "You are an expert programmer. "
                         "Write clean, well-commented code examples."
            },
            {
                "user": "Write a {language} code example demonstrating {concept}. "
                       "Include comments explaining each part."
            }
        ]
    )

    # Step 2: Generate explanation
    workflow.llm(
        model="gpt-4o",
        prompt=[
            {
                "system": "You are a programming tutor."
            },
            {
                "user": "Explain this code in simple terms:\n{llm_1_response}"
            }
        ]
    )

    # Step 3: Generate use cases
    workflow.llm(
        model="gpt-4o-mini",
        prompt="List 3 practical use cases for {concept} in {language}"
    )

    workflow.sink("output/code_examples.jsonl")

    results = workflow.run()
    print(f"✓ Generated {len(results)} code examples")
    return results

if __name__ == "__main__":
    create_code_examples()

Example 3: Content Evolution Pipeline

import sygra

def evolve_instructions():
    """Evolve simple instructions into complex ones"""

    workflow = sygra.Workflow("instruction_evolution")

    # Simple seed instructions
    seeds = [
        {"instruction": "Write a function to add two numbers"},
        {"instruction": "Create a class to store user data"},
        {"instruction": "Build a REST API endpoint"},
    ]

    workflow.source(seeds)

    # Evolution step 1: Add complexity
    workflow.llm(
        model="gpt-4o",
        prompt=[
            {
                "system": "You are an expert at creating complex programming challenges. "
                         "Evolve simple instructions into more detailed, complex ones."
            },
            {
                "user": "Evolve this instruction to be more complex and detailed:\n"
                       "{instruction}"
            }
        ]
    )

    # Evolution step 2: Add constraints and requirements
    workflow.llm(
        model="gpt-4o",
        prompt="Add specific technical constraints and requirements to this instruction:\n"
               "{llm_1_response}"
    )

    # Evolution step 3: Add test cases
    workflow.llm(
        model="gpt-4o-mini",
        prompt="Generate 3 test cases for this instruction:\n{llm_2_response}"
    )

    # Enable quality tagging for evolution
    workflow.quality_tagging(
        enabled=True,
        config={"metrics": ["complexity", "clarity", "completeness"]}
    )

    workflow.sink("output/evolved_instructions.jsonl")

    results = workflow.run()
    print(f"✓ Evolved {len(results)} instructions")
    return results

if __name__ == "__main__":
    evolve_instructions()

Example 4: Multi-Agent Simulation

import sygra

def simulate_conversation():
    """Simulate multi-agent conversation"""

    workflow = sygra.Workflow("agent_simulation")

    scenarios = [
        {
            "topic": "climate change solutions",
            "agent1_role": "environmental scientist",
            "agent2_role": "policy maker"
        }
    ]

    workflow.source(scenarios)

    # Agent 1: Environmental Scientist
    workflow.agent(
        model="gpt-4o",
        tools=[],
        prompt=[
            {
                "system": "You are an {agent1_role}. "
                         "Provide scientific perspective on {topic}."
            },
            {
                "user": "Share your perspective on {topic}"
            }
        ]
    )

    # Agent 2: Policy Maker
    workflow.agent(
        model="gpt-4o",
        tools=[],
        prompt=[
            {
                "system": "You are a {agent2_role}. "
                         "Respond to the scientist's perspective."
            },
            {
                "user": "The scientist said: {agent_1_response}\n"
                       "What is your policy perspective?"
            }
        ]
    )

    # Synthesize conversation
    workflow.llm(
        model="gpt-4o",
        prompt="Synthesize this conversation into key points of agreement and disagreement:\n"
               "Scientist: {agent_1_response}\n"
               "Policy Maker: {agent_2_response}"
    )

    workflow.sink("output/agent_conversations.jsonl")

    results = workflow.run()
    print(f"✓ Simulated {len(results)} conversations")
    return results

if __name__ == "__main__":
    simulate_conversation()

Example 5: Using Existing YAML Tasks

import sygra

# Execute an existing YAML-based task
def run_existing_task():
    """Run a pre-configured YAML task"""

    # The task path points to a directory with graph_config.yaml
    workflow = sygra.Workflow("tasks/examples/evol_instruct")

    # Run with custom parameters
    results = workflow.run(
        num_records=50,
        batch_size=10,
        output_dir="output/evol_results",
        resume=False,
        debug=False
    )

    print(f"✓ Completed task with {len(results)} results")
    return results

# Override specific nodes in existing task
def run_task_with_overrides():
    """Run existing task with node overrides"""

    workflow = sygra.Workflow("tasks/examples/evol_instruct")

    # Override specific node configuration
    workflow.override(
        "graph_config.nodes.evolver.model.name",
        "gpt-4o-mini"  # Use different model
    )

    workflow.override(
        "graph_config.nodes.evolver.model.temperature",
        0.5
    )

    results = workflow.run(num_records=20)
    return results

if __name__ == "__main__":
    run_existing_task()
    run_task_with_overrides()

Best Practices

1. Error Handling

import sygra

def safe_workflow_execution():
    """Workflow with proper error handling"""

    try:
        workflow = sygra.Workflow("safe_execution")

        workflow.source("data/input.jsonl")
        workflow.llm("gpt-4o", "Process: {text}")
        workflow.sink("output/results.jsonl")

        results = workflow.run(
            num_records=100,
            resume=True,  # Enable resume in case of failure
            checkpoint_interval=25  # Save progress frequently
        )

        print(f"Success: {len(results)} records processed")
        return results

    except Exception as e:
        print(f"Unexpected error: {e}")

2. Configuration Management

import sygra

# Use configuration loader for reusable settings
config = sygra.load_config("config/my_models.yaml")

workflow = sygra.Workflow("configured_workflow")

workflow.source(data)

# Use models from config
workflow.llm(
    model=config["models"]["primary"],
    prompt="Process: {text}"
)

workflow.sink("output/results.jsonl")
workflow.run()

3. Testing Workflows

import sygra

def test_workflow():
    """Test workflow with small dataset first"""

    # Create test data
    test_data = [
        {"text": "Test input 1"},
        {"text": "Test input 2"},
    ]

    workflow = sygra.Workflow("test_workflow")
    workflow.source(test_data)
    workflow.llm("gpt-4o-mini", "Echo: {text}")  # Use cheaper model for testing
    workflow.sink("output/test_results.jsonl")

    # Run with small dataset
    results = workflow.run(num_records=2)

    # Validate results
    assert len(results) == 2, "Expected 2 results"
    assert all("llm_1_response" in r for r in results), "Missing LLM response"

    print("Test passed - ready for production")
    return True

if __name__ == "__main__":
    test_workflow()

4. Progressive Enhancement

import sygra

def progressive_workflow():
    """Build workflow progressively for better debugging"""

    workflow = sygra.Workflow("progressive")

    # Start simple
    workflow.source([{"text": "Hello"}])
    workflow.llm("gpt-4o", "Echo: {text}")
    workflow.sink("output/step1.jsonl")

    # Test first step
    result1 = workflow.run(num_records=1)
    print(f"Step 1 complete: {result1}")

    # Add second step
    workflow.llm("gpt-4o", "Expand: {llm_1_response}")
    workflow.sink("output/step2.jsonl")

    # Test with both steps
    result2 = workflow.run(num_records=1)
    print(f"Step 2 complete: {result2}")

    # Continue adding steps...

5. Resource Management

import sygra

def efficient_workflow():
    """Workflow with efficient resource usage"""

    workflow = sygra.Workflow("efficient")

    # Use file source for large datasets (memory efficient)
    workflow.source("data/large_dataset.jsonl")

    # Use cheaper model for simple tasks
    workflow.llm(
        model="gpt-4o-mini",  # More cost-effective
        prompt="Simple task: {text}"
    )

    # Stream to output (don't keep all in memory)
    workflow.sink("output/results.jsonl")

    # Process in batches
    workflow.run(
        num_records=10000,
        batch_size=50,  # Process 50 at a time
        checkpoint_interval=500  # Save progress
    )

Troubleshooting Guide

Common Issues and Solutions

Issue 1: Import Errors

# Problem: Module not found
try:
    import sygra
except ImportError:
    print("❌ SyGra not installed")
    print("Solution: pip install sygra")

# Verify installation
import sygra
print(f"✓ SyGra version: {sygra.__version__}")
print(f"✓ Features available: {sygra.validate_environment()}")

Issue 2: Configuration Errors

import sygra

try:
    workflow = sygra.Workflow("my_task")
    workflow.run()
except sygra.ConfigurationError as e:
    print(f"❌ Configuration error: {e}")
    print("Solutions:")
    print("  1. Check graph_config has 'nodes' and 'edges'")
    print("  2. Verify all node types are valid")
    print("  3. Ensure models are properly configured")

Issue 3: Data Source Issues

import sygra
from pathlib import Path

def check_data_source(file_path):
    """Validate data source before running workflow"""

    # Check file exists
    if not Path(file_path).exists():
        print(f"File not found: {file_path}")
        return False

    # Check file is readable
    try:
        with open(file_path) as f:
            first_line = f.readline()
            import json
            json.loads(first_line)
        print(f"Data source valid: {file_path}")
        return True
    except Exception as e:
        print(f"Invalid data format: {e}")
        print("Solution: Ensure file is valid JSONL (one JSON per line)")
        return False

# Use in workflow
if check_data_source("data/input.jsonl"):
    workflow = sygra.Workflow("safe_workflow")
    workflow.source("data/input.jsonl")
    workflow.llm("gpt-4o", "Process: {text}")
    workflow.sink("output/results.jsonl")
    workflow.run()

Issue 4: Memory Issues with Large Datasets

import sygra

def handle_large_dataset():
    """Process large dataset efficiently"""

    workflow = sygra.Workflow("large_dataset")

    # Use file source (streams data, not all in memory)
    workflow.source("data/very_large_file.jsonl")

    workflow.llm("gpt-4o-mini", "Process: {text}")

    # Use file sink (streams output)
    workflow.sink("output/results.jsonl")

    # Process in chunks
    total_records = 100000
    chunk_size = 1000

    for start in range(0, total_records, chunk_size):
        print(f"Processing records {start} to {start + chunk_size}")
        workflow.run(
            num_records=chunk_size,
            start_index=start,
            batch_size=50
        )

Advanced Patterns

Pattern 1: Pipeline Pattern

import sygra

def create_processing_pipeline():
    """Multi-stage processing pipeline"""

    # Stage 1: Data cleaning
    stage1 = sygra.Workflow("stage1_clean")
    stage1.source("data/raw_data.jsonl")
    stage1.llm("gpt-4o-mini", "Clean and normalize: {text}")
    stage1.sink("output/stage1_cleaned.jsonl")
    stage1.run()

    # Stage 2: Enhancement
    stage2 = sygra.Workflow("stage2_enhance")
    stage2.source("output/stage1_cleaned.jsonl")
    stage2.llm("gpt-4o", "Enhance with details: {llm_1_response}")
    stage2.sink("output/stage2_enhanced.jsonl")
    stage2.run()

    # Stage 3: Quality filtering
    stage3 = sygra.Workflow("stage3_filter")
    stage3.source("output/stage2_enhanced.jsonl")
    stage3.quality_tagging(True)
    stage3.sink("output/final_output.jsonl")
    stage3.run()

    print("✓ Pipeline complete")

Pattern 2: Fan-Out Pattern

import sygra

def fan_out_processing():
    """Process one input with multiple models in parallel"""

    # Shared input
    input_data = [{"question": "What is quantum computing?"}]

    # Create multiple workflows for different models
    models = {
        "gpt4": "gpt-4o",
        "gpt4mini": "gpt-4o-mini",
        "claude": "claude-sonnet-4"
    }

    results = {}

    for name, model in models.items():
        workflow = sygra.Workflow(f"fanout_{name}")
        workflow.source(input_data)
        workflow.llm(model, "Answer: {question}")
        workflow.sink(f"output/fanout_{name}.jsonl")
        results[name] = workflow.run()

    print(f"✓ Processed with {len(models)} models")
    return results

Pattern 3: Iterative Refinement

import sygra

def iterative_refinement():
    """Iteratively refine output through multiple passes"""

    data = [{"text": "Write about AI"}]

    # Pass 1: Initial generation
    workflow1 = sygra.Workflow("refinement_pass1")
    workflow1.source(data)
    workflow1.llm("gpt-4o", "Write a draft about: {text}")
    workflow1.sink("output/draft_v1.jsonl")
    result1 = workflow1.run()

    # Pass 2: Critique and improve
    workflow2 = sygra.Workflow("refinement_pass2")
    workflow2.source("output/draft_v1.jsonl")
    workflow2.llm(
        "gpt-4o",
        "Critique this draft and suggest improvements: {llm_1_response}"
    )
    workflow2.sink("output/draft_v2_critique.jsonl")
    result2 = workflow2.run()

    # Pass 3: Final revision
    workflow3 = sygra.Workflow("refinement_pass3")
    workflow3.source("output/draft_v2_critique.jsonl")
    workflow3.llm(
        "gpt-4o",
        "Revise the draft based on this critique:\n"
        "Draft: {llm_1_response}\n"
        "Critique: {llm_2_response}"
    )
    workflow3.sink("output/final_version.jsonl")
    result3 = workflow3.run()

    print("✓ Refinement complete")
    return result3

Pattern 4: Consensus Pattern

import sygra

def build_consensus():
    """Get consensus from multiple models"""

    workflow = sygra.Workflow("consensus")

    workflow.source([
        {"question": "What are the key challenges in renewable energy?"}
    ])

    # Get responses from multiple models
    workflow.multi_llm(
        models={
            "model1": "gpt-4o",
            "model2": "claude-sonnet-4",
            "model3": "gpt-4o-mini"
        },
        prompt="Answer this question: {question}"
    )

    # Synthesize consensus
    workflow.llm(
        "gpt-4o",
        "Based on these three responses, create a consensus answer "
        "that incorporates the best points from each:\n{multi_llm_1_response}"
    )

    workflow.sink("output/consensus_answers.jsonl")
    workflow.run()

Pattern 5: Chain of Thought

import sygra

def chain_of_thought():
    """Implement chain-of-thought reasoning"""

    workflow = sygra.Workflow("chain_of_thought")

    workflow.source([
        {"problem": "A train travels 120 km in 2 hours, then 180 km in 3 hours. "
                   "What is its average speed for the entire journey?"}
    ])

    # Step 1: Break down the problem
    workflow.llm(
        "gpt-4o",
        prompt=[
            {"system": "Break down problems step by step."},
            {"user": "Break down this problem into steps: {problem}"}
        ]
    )

    # Step 2: Solve each step
    workflow.llm(
        "gpt-4o",
        "Now solve each step:\n{llm_1_response}"
    )

    # Step 3: Verify the solution
    workflow.llm(
        "gpt-4o",
        "Verify this solution is correct:\n{llm_2_response}"
    )

    workflow.sink("output/chain_of_thought.jsonl")
    workflow.run()

Performance Optimization

Optimization 1: Model Selection

import sygra

def optimized_model_selection():
    """Use appropriate models for different tasks"""

    workflow = sygra.Workflow("optimized")

    workflow.source("data/tasks.jsonl")

    # Simple tasks: Use smaller, faster model
    workflow.llm(
        model="gpt-4o-mini",  # Faster and cheaper
        prompt="Simple extraction: {text}"
    )

    # Complex tasks: Use more capable model only when needed
    workflow.llm(
        model="gpt-4o",  # Use for complex reasoning
        prompt="Complex analysis: {llm_1_response}"
    )

    workflow.sink("output/optimized.jsonl")
    workflow.run()

Optimization 2: Batch Processing

import sygra

def optimized_batching():
    """Optimize batch size for throughput"""

    workflow = sygra.Workflow("batched")

    workflow.source("data/large_dataset.jsonl")
    workflow.llm("gpt-4o-mini", "Process: {text}")
    workflow.sink("output/results.jsonl")

    # Optimal batch size depends on:
    # - API rate limits
    # - Memory constraints
    # - Network latency

    workflow.run(
        num_records=10000,
        batch_size=100,  # Larger batches for better throughput
        checkpoint_interval=1000  # Less frequent checkpoints
    )

Optimization 3: Caching

import sygra
import json
from pathlib import Path

def cached_workflow():
    """Implement caching to avoid duplicate processing"""

    cache_file = Path("cache/processed.json")
    cache_file.parent.mkdir(exist_ok=True)

    # Load cache
    cache = {}
    if cache_file.exists():
        with open(cache_file) as f:
            cache = json.load(f)

    # Process only new items
    all_data = [
        {"id": 1, "text": "First item"},
        {"id": 2, "text": "Second item"},
    ]

    new_data = [item for item in all_data if str(item["id"]) not in cache]

    if new_data:
        workflow = sygra.Workflow("cached")
        workflow.source(new_data)
        workflow.llm("gpt-4o", "Process: {text}")
        workflow.sink("output/new_results.jsonl")
        results = workflow.run()

        # Update cache
        for item in results:
            cache[str(item["id"])] = item

        with open(cache_file, "w") as f:
            json.dump(cache, f)

        print(f"Processed {len(new_data)} new items")
    else:
        print("All items already in cache")

API Reference Quick Guide

Core Classes

# Workflow - Main workflow builder
workflow = sygra.Workflow(name="my_workflow")

# Configuration loader
config = sygra.load_config("config/settings.yaml")

# Model configuration
model_config = sygra.ModelConfigBuilder.from_name("gpt-4o")

Workflow Methods

# Data source/sink
workflow.source(data)  # Set input data
workflow.sink(path)    # Set output path

# Node addition
workflow.llm(model, prompt, **kwargs)  # Add LLM node
workflow.agent(model, tools, prompt, **kwargs)  # Add agent node
workflow.multi_llm(models, prompt, **kwargs)  # Add multi-LLM node
workflow.lambda_func(func, output, **kwargs)  # Add lambda node
workflow.subgraph(name, config_map)  # Add subgraph

# Configuration
workflow.resumable(True)  # Enable resumable execution
workflow.quality_tagging(True)  # Enable quality tagging
workflow.output_fields(fields)  # Set output fields

# Execution
results = workflow.run(
    num_records=100,
    batch_size=25,
    start_index=0,
    output_dir="output/",
    debug=False,
    resume=False
)

Quick Utilities

# Quick LLM
sygra.quick_llm(model, prompt, data_source, output)

# Quick Agent
sygra.quick_agent(model, prompt, tools, data_source, output)

# Execute task
sygra.execute_task(task_name, **kwargs)

# List models
models = sygra.list_available_models()

FAQ

Q: How do I handle large datasets?

# Use file sources and batch processing
workflow.source("large_file.jsonl")
workflow.run(batch_size=50, checkpoint_interval=100)

Q: How do I reduce costs?

# Use smaller models for simple tasks
workflow.llm("gpt-4o-mini", "Simple task: {text}")  # Cheaper

# Test with small datasets first
workflow.run(num_records=10)  # Test before full run

Q: How do I resume interrupted workflows?

# Enable resumable execution
workflow.resumable(True)
workflow.run(resume=True, checkpoint_interval=100)

Q: How do I validate my workflow?

# Test with minimal data first
test_data = [{"text": "test"}]
workflow.source(test_data)
results = workflow.run(num_records=1)
assert len(results) == 1  # Validate

Q: How do I debug issues?

# Enable debug mode
workflow.run(debug=True)

# Use smaller models for faster iteration
workflow.llm("gpt-4o-mini", prompt)  # Faster for testing


Conclusion

This guide covers the essential usage patterns for SyGra. All examples are tested and ready to use. Start with the basic examples and progressively explore advanced features as needed.

For the latest updates and additional examples, check the official documentation and example tasks in the repository.