Complete guide for using the Streamware framework for stream processing and data pipelines.
# Basic installation
pip install streamware
# With all features
pip install streamware[all]
# Specific features
pip install streamware[kafka,rabbitmq,postgres]
from streamware import flow
# Simple pipeline
result = (
flow("http://api.example.com/data")
| "transform://json"
| "file://write?path=output.json"
).run()
A Flow is the fundamental building block in Streamware. It represents a series of processing steps that data passes through.
from streamware import flow
# Create a flow
my_flow = flow("file://read?path=input.txt")
# Chain operations
my_flow = my_flow | "transform://uppercase" | "file://write?path=output.txt"
# Execute
result = my_flow.run()
Components are the processing units in a flow. Each component:
file, http, transform)from streamware import Component, register
@register("mycomponent")
class MyComponent(Component):
def process(self, data):
# Your processing logic
return processed_data
Streamware uses URI-style syntax for component configuration:
scheme://operation?param1=value1¶m2=value2
Examples:
"file://read?path=/tmp/data.json"
"http://api.example.com/users?method=post"
"transform://csv?delimiter=;"
"kafka://consume?topic=events&group=processor"
# Read text file
content = flow("file://read?path=/tmp/input.txt").run()
# Read JSON file
data = flow("file://read?path=/tmp/data.json").run()
# Write text
flow("file://write?path=/tmp/output.txt").run("Hello World")
# Write JSON
flow("file://write?path=/tmp/data.json").run({"key": "value"})
# Append mode
flow("file://write?path=/tmp/log.txt&mode=append").run("Log entry\n")
# Parse JSON string
data = flow("transform://json").run('{"name":"Alice"}')
# Convert to JSON string
json_str = flow("transform://json").run({"name": "Alice"})
# Convert list of dicts to CSV
csv_data = flow("transform://csv").run([
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25}
])
# Custom delimiter
csv_data = flow("transform://csv?delimiter=;").run(data)
# Encode
encoded = flow("transform://base64").run("Hello World")
# Decode
decoded = flow("transform://base64?decode=true").run(encoded)
# Simple GET
response = flow("http://api.example.com/data").run()
# With parameters
response = flow("http://api.example.com/users?limit=10").run()
# POST with JSON body
response = flow("http://api.example.com/users?method=post").run({
"name": "Alice",
"email": "alice@example.com"
})
# Multi-step pipeline
result = (
flow("http://api.example.com/data")
| "transform://jsonpath?query=$.items[*]"
| "transform://csv"
| "file://write?path=output.csv"
).run()
Process array items individually and collect results:
from streamware.patterns import SplitPattern, JoinPattern
# Split data
splitter = SplitPattern()
items = splitter.split([1, 2, 3, 4, 5])
# Process each item
processed = [item * 2 for item in items]
# Join results
joiner = JoinPattern("list")
result = joiner.join(processed)
Filter data based on conditions:
from streamware.patterns import FilterPattern
# Create filter
age_filter = FilterPattern(lambda x: x.get("age", 0) >= 18)
# Apply filter
users = [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 17},
{"name": "Charlie", "age": 25}
]
adults = [user for user in users if age_filter.filter(user)]
Aggregate data with various strategies:
from streamware.patterns import JoinPattern
# Sum
joiner = JoinPattern("sum")
total = joiner.join([10, 20, 30, 40]) # Result: 100
# List (collect)
joiner = JoinPattern("list")
items = joiner.join([1, 2, 3]) # Result: [1, 2, 3]
Route data based on conditions:
def process_by_priority(data):
priority = data.get("priority", "normal")
if priority == "high":
return flow("process://urgent").run(data)
elif priority == "normal":
return flow("process://standard").run(data)
else:
return flow("process://batch").run(data)
Handle errors gracefully:
try:
result = (
flow("http://api.example.com/data")
| "transform://json"
| "validate://schema"
| "file://write?path=output.json"
).run()
except ComponentError as e:
print(f"Pipeline error: {e}")
# Handle error or fallback
Process data as a stream:
# Stream processing
for item in flow("kafka://consume?topic=events").stream():
processed = flow("transform://normalize").run(item)
flow("postgres://insert?table=events").run(processed)
fileread, write, delete, watchpath, mode, encoding# Examples
flow("file://read?path=/tmp/data.txt")
flow("file://write?path=/tmp/output.json&mode=append")
flow("file://delete?path=/tmp/temp.txt")
http, httpsmethod, headers, timeout# Examples
flow("http://api.example.com/users")
flow("http://api.example.com/users?method=post")
transformjson, csv, base64, jsonpath, template# Examples
flow("transform://json")
flow("transform://csv?delimiter=;")
flow("transform://base64?decode=true")
flow("transform://jsonpath?query=$.items[*]")
flow("email://send?to=user@example.com&subject=Hello").run("Message body")
flow("telegram://send?chat_id=@channel&token=BOT_TOKEN").run("Hello!")
flow("sms://send?provider=twilio&to=+1234567890").run("Alert!")
# Consume
flow("kafka://consume?topic=events&group=processor")
# Produce
flow("kafka://produce?topic=events&key=id")
# Consume
flow("rabbitmq://consume?queue=tasks")
# Publish
flow("rabbitmq://publish?exchange=events&routing_key=new")
# Query
flow("postgres://query?sql=SELECT * FROM users")
# Insert
flow("postgres://insert?table=users")
# Update
flow("postgres://update?table=users&where=id=1")
import pytest
from streamware import flow, Component, register
def test_simple_pipeline():
"""Test basic pipeline"""
data = {"test": "data"}
result = flow("transform://json").run(data)
assert isinstance(result, str)
def test_custom_component():
"""Test custom component"""
@register("test-component")
class TestComponent(Component):
def process(self, data):
return data * 2
result = flow("test-component://").run(5)
assert result == 10
# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ -v --cov=streamware --cov-report=term-missing
# Run specific test file
pytest tests/test_streamware.py -v
@register("process")
class ProcessComponent(Component):
"""
Process data with specific transformation
Parameters:
mode (str): Processing mode (default: 'normal')
validate (bool): Enable validation (default: True)
"""
def process(self, data):
mode = self.uri.get_param("mode", "normal")
validate = self.uri.get_param("validate", True)
if validate:
self._validate(data)
return self._process(data, mode)
Always handle errors appropriately:
from streamware.exceptions import ComponentError
try:
result = flow("risky://operation").run(data)
except ComponentError as e:
# Log error
logger.error(f"Operation failed: {e}")
# Fallback strategy
result = default_value
Organize complex pipelines for readability:
# Good: Clear stages
result = (
flow("source://data")
| "validate://schema"
| "transform://normalize"
| "enrich://metadata"
| "sink://destination"
).run()
# Better: With comments
result = (
flow("source://data") # Fetch data
| "validate://schema" # Validate structure
| "transform://normalize" # Normalize format
| "enrich://metadata" # Add metadata
| "sink://destination" # Store result
).run()
Clean up resources properly:
import tempfile
import os
temp_file = tempfile.mktemp(suffix=".json")
try:
result = (
flow("transform://json")
| f"file://write?path={temp_file}"
).run(data)
finally:
if os.path.exists(temp_file):
os.remove(temp_file)
Test components in isolation:
def test_component_isolation():
"""Test component without dependencies"""
component = MyComponent(StreamwareURI("mycomp://"))
# Test with mock data
result = component.process({"test": "data"})
# Assert expectations
assert result["processed"] == True
Enable diagnostics for debugging:
import streamware
# Enable debug logging
streamware.enable_diagnostics(level="DEBUG")
# Use diagnostics in flows
flow("http://api.example.com/data") \
.with_diagnostics(trace=True) \
| "transform://json" \
| "file://write?path=output.json"
# Stream large datasets
for batch in flow("source://large-dataset").stream(batch_size=1000):
process_batch(batch)
# Cache expensive lookups
@cache
def get_reference_data():
return flow("database://reference").run()
See the examples/ directory for complete working examples:
basic_usage.py - Basic patterns and operationsadvanced_patterns.py - Advanced workflow patternsexamples_communication.py - Communication componentsexamples_advanced_communication.py - Production patternsBuilt with ❤️ by Softreck