Get started with Streamware in 5 minutes!
# Basic installation
pip install streamware
# With all features
pip install streamware[all]
Create a file hello_streamware.py:
from streamware import flow
# Create a simple pipeline
result = flow("transform://json").run({"message": "Hello, Streamware!"})
print(result)
Run it:
python hello_streamware.py
Output:
{"message": "Hello, Streamware!"}
A Flow is a pipeline of processing steps:
from streamware import flow
# Single step
flow("component://operation")
# Multiple steps (chaining)
flow("step1://") | "step2://" | "step3://"
Components are the building blocks that process data:
# File operations
flow("file://read?path=/tmp/data.txt")
flow("file://write?path=/tmp/output.txt")
# Data transformation
flow("transform://json")
flow("transform://csv")
# HTTP requests
flow("http://api.example.com/data")
Components use URI-style syntax:
scheme://operation?param1=value1¶m2=value2
Examples:
"file://read?path=/tmp/data.json"
"http://api.example.com/users?limit=10"
"transform://csv?delimiter=;"
import tempfile
import os
temp_file = os.path.join(tempfile.gettempdir(), "output.json")
data = {"users": ["Alice", "Bob", "Charlie"]}
result = (
flow("transform://json")
| f"file://write?path={temp_file}"
).run(data)
print(f"Written to: {temp_file}")
# Fetch data from API, transform it, and save
result = (
flow("http://api.example.com/data")
| "transform://jsonpath?query=$.items[*]"
| "file://write?path=results.json"
).run()
from streamware import Component, register
@register("uppercase")
class UppercaseComponent(Component):
def process(self, data):
return data.upper() if isinstance(data, str) else data
# Use your custom component
result = flow("uppercase://").run("hello world")
print(result) # Output: HELLO WORLD
# Parse JSON string
data = flow("transform://json").run('{"key": "value"}')
# Convert to JSON string
json_str = flow("transform://json").run({"key": "value"})
# Convert list of dicts to CSV
data = [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25}
]
csv_output = flow("transform://csv").run(data)
print(csv_output)
# Encode
encoded = flow("transform://base64").run("Hello World")
# Decode
decoded = flow("transform://base64?decode=true").run(encoded)
content = flow("file://read?path=/tmp/input.txt").run()
print(content)
flow("file://write?path=/tmp/output.txt").run("Hello, World!")
flow("file://write?path=/tmp/log.txt&mode=append").run("Log entry\n")
try:
result = flow("file://read?path=/nonexistent.txt").run()
except Exception as e:
print(f"Error: {e}")
result = None
# Basic usage patterns
python examples/basic_usage.py
# Advanced patterns
python examples/advanced_patterns.py
from streamware.core import registry
# List all available components
print(registry.list_components())
from streamware import flow
import os
def process_data_pipeline():
"""
Example: Read CSV, process data, save results
"""
try:
# Your pipeline here
result = (
flow("file://read?path=input.csv")
| "transform://csv"
| "your-processing://"
| "file://write?path=output.json"
).run()
print("Pipeline completed successfully!")
return result
except Exception as e:
print(f"Pipeline failed: {e}")
return None
# Run it
if __name__ == "__main__":
process_data_pipeline()
# Extract, Transform, Load
(
flow("database://query?sql=SELECT * FROM users")
| "transform://normalize"
| "transform://validate"
| "database://insert?table=processed_users"
).run()
# Fetch from API and process
(
flow("http://api.example.com/data")
| "transform://jsonpath?query=$.results[*]"
| "enrich://metadata"
| "kafka://produce?topic=events"
).run()
# Process multiple files
import glob
for file_path in glob.glob("/data/*.json"):
(
flow(f"file://read?path={file_path}")
| "validate://schema"
| "transform://clean"
| f"file://write?path=/processed/{os.path.basename(file_path)}"
).run()
# Check system and send alerts
status = flow("system://health").run()
if status["cpu_percent"] > 80:
(
flow("slack://send?channel=ops&token=TOKEN")
).run(f"High CPU usage: {status['cpu_percent']}%")
import streamware
streamware.enable_diagnostics(level="DEBUG")
# Now all pipeline operations will be logged
flow("http://api.example.com").run()
# Use | operator for clean chaining
result = (
flow("step1://")
| "step2://"
| "step3://"
| "step4://"
).run(input_data)
# Create reusable flow
normalize_flow = (
flow("validate://schema")
| "transform://normalize"
| "enrich://metadata"
)
# Use it multiple times
result1 = normalize_flow.run(data1)
result2 = normalize_flow.run(data2)
def process_by_type(data):
data_type = data.get("type")
if data_type == "json":
return flow("transform://json").run(data)
elif data_type == "csv":
return flow("transform://csv").run(data)
else:
return flow("transform://text").run(data)
Now that youβve learned the basics, explore:
Happy streaming! π
Built with β€οΈ by Softreck