Streamware provides comprehensive communication components for integrating with email, messaging platforms, and SMS services. These components enable you to build sophisticated multi-channel communication workflows for customer support, notifications, marketing automation, and more.
Install Streamware with communication components:
pip install streamware[communication]
# Or install specific components
pip install streamware[telegram,slack,email]
# Install all components
pip install streamware[all]
from streamware import flow
# Basic email sending
flow("email://send?to=user@example.com&subject=Hello").run("Message body")
# With SMTP configuration
flow("email://send"
"?smtp_host=smtp.gmail.com"
"&smtp_port=587"
"&user=sender@gmail.com"
"&password=app_password"
"&to=recipient@example.com"
).run("Email content")
| Operation | Description | URI Example |
|---|---|---|
send |
Send email | email://send?to=user@example.com |
read |
Read emails from inbox | email://read?folder=INBOX&unread=true |
watch |
Monitor inbox for new emails | email-watch://interval=60 |
delete |
Delete email by ID | email://delete |
search |
Search emails | email://search?criteria=UNSEEN |
# Send HTML email with attachments
result = flow("email://send").with_data({
"to": "user@example.com",
"subject": "Monthly Report",
"body": "Please find attached the monthly report.",
"html": "<h1>Monthly Report</h1><p>See attachment</p>",
"attachments": ["report.pdf", "data.xlsx"]
}).run()
# Watch inbox and process emails
for email in flow("email-watch://interval=30").stream():
print(f"New email from {email['from']}: {email['subject']}")
# Filter and auto-respond
flow("email://read?unread=true") \
| "email-filter://subject=Support" \
| "transform://template?template=Thank you for contacting support..." \
| "email://send"
# Get bot token from @BotFather on Telegram
flow("telegram://send?chat_id=@channel&token=BOT_TOKEN").run("Message")
| Operation | Description | URI Example |
|---|---|---|
send |
Send text message | telegram://send?chat_id=123456 |
send_photo |
Send photo | telegram://send_photo?chat_id=123456 |
send_document |
Send document | telegram://send_document?chat_id=123456 |
poll |
Long polling for updates | telegram://poll?timeout=30 |
webhook |
Set webhook | telegram://webhook?url=https://myapp.com |
# Create Telegram bot
bot_pipeline = (
flow("telegram-bot://token=YOUR_BOT_TOKEN")
| "telegram-command://"
| "choose://"
.when("$.command=='/start'", "telegram://send?text=Welcome!")
.when("$.command=='/help'", "telegram://send?text=Available commands...")
.otherwise("telegram://send?text=Unknown command")
)
# Process messages
for update in bot_pipeline.stream():
print(f"Received: {update}")
WhatsApp component supports multiple providers:
# Twilio WhatsApp
flow("whatsapp://send"
"?provider=twilio"
"&account_sid=YOUR_SID"
"&auth_token=YOUR_TOKEN"
"&from_number=+14155238886" # Twilio WhatsApp number
).run({"phone": "+1234567890", "message": "Hello!"})
# WhatsApp Business API
flow("whatsapp://send"
"?provider=business_api"
"&token=YOUR_TOKEN"
"&phone_number_id=YOUR_PHONE_ID"
).run({"phone": "+1234567890", "message": "Hello!"})
# Send template message
flow("whatsapp://template"
"?template=order_confirmation"
"&language=en"
).run({
"phone": "+1234567890",
"parameters": ["ORDER123", "$99.99", "2 days"]
})
# Discord bot token from Discord Developer Portal
flow("discord://send?channel_id=123456&token=BOT_TOKEN").run("Message")
| Operation | Description | URI Example |
|---|---|---|
send |
Send message | discord://send?channel_id=123456 |
send_embed |
Send embedded message | discord://send_embed?channel_id=123456 |
webhook |
Send via webhook | discord://webhook?url=WEBHOOK_URL |
create_thread |
Create thread | discord://create_thread?channel_id=123456 |
# Send rich embed
flow("discord://send_embed?channel_id=123456").with_data({
"title": "📊 Daily Report",
"description": "Here's your daily summary",
"color": 0x00ff00,
"fields": [
{"name": "Sales", "value": "$10,000", "inline": True},
{"name": "Users", "value": "1,234", "inline": True}
],
"footer": "Generated at " + str(datetime.now())
}).run()
# Slack bot token from Slack App
flow("slack://send?channel=general&token=xoxb-YOUR-TOKEN").run("Message")
| Operation | Description | URI Example |
|---|---|---|
send |
Send message | slack://send?channel=general |
upload |
Upload file | slack://upload?channel=general |
get_users |
List users | slack://get_users |
search |
Search messages | slack://search?query=important |
create_channel |
Create channel | slack://create_channel?name=new-channel |
# Send interactive message with buttons
flow("slack://send?channel=approvals").with_data({
"text": "Approval Required",
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": "*Request:* New laptop\n*Cost:* $2000"}
},
{
"type": "actions",
"elements": [
{"type": "button", "text": {"type": "plain_text", "text": "✅ Approve"}},
{"type": "button", "text": {"type": "plain_text", "text": "❌ Deny"}}
]
}
]
}).run()
SMS component supports:
# Twilio SMS
flow("sms://send"
"?provider=twilio"
"&account_sid=YOUR_SID"
"&auth_token=YOUR_TOKEN"
"&from_number=+1234567890"
).run({"to": "+0987654321", "message": "Hello!"})
# Vonage SMS
flow("sms://send"
"?provider=vonage"
"&api_key=YOUR_KEY"
"&api_secret=YOUR_SECRET"
).run({"to": "+1234567890", "message": "Hello!"})
# Send verification code
result = flow("sms://verify?provider=twilio").run({
"phone": "+1234567890"
})
print(f"Verification code: {result['verification_code']}")
# Bulk SMS
flow("sms://bulk"
"?provider=twilio"
"&numbers=+123,+456,+789"
).run("Broadcast message")
# Unified support ticket system
support_flow = (
flow("multicast://parallel=true")
.destinations([
"email-watch://folder=support",
"telegram-bot://token=SUPPORT_BOT",
"slack-events://channel=support"
])
| "transform://normalize_request"
| "postgres://insert?table=tickets"
| "choose://"
.when("$.priority=='high'", [
"sms://send?to=",
"slack://send?channel=urgent"
])
.otherwise("email://send?to=support@company.com")
)
# Send order confirmation across all channels
order_notification = (
flow("kafka://consume?topic=orders")
| "transform://json"
| "multicast://parallel=true"
.destinations([
"email://send?template=order_confirmation",
"sms://send?message=Order confirmed!",
"whatsapp://template?template=order_confirm",
"telegram://send?text=✅ Order confirmed!"
])
)
# Multi-tier escalation
escalation = (
flow("prometheus://alerts")
| "filter://severity==critical"
| "slack://send?channel=incidents"
| "wait://minutes=5"
| "choose://"
.when("$.acknowledged==false",
"sms://send?to=")
| "wait://minutes=10"
| "choose://"
.when("$.acknowledged==false",
"phone://call?to=")
)
# Personalized multi-channel campaign
campaign = (
flow("postgres://query?sql=SELECT * FROM customers")
| "split://"
| "enrich://preferences"
| "choose://"
.when("$.channel=='email'",
"email://send?template=campaign")
.when("$.channel=='sms'",
"sms://send?message=")
.when("$.channel=='whatsapp'",
"whatsapp://template?template=promotion")
)
# Implement fallback channels
notification = (
flow("email://send?to=")
.on_error("sms://send?to=")
.on_error("slack://send?channel=failures")
)
# Respect API rate limits
flow("sms://bulk") \
.with_rate_limit(messages_per_second=1) \
.run(phone_numbers)
# Use templates for consistent messaging
templates = {
"welcome": "Welcome ! Your account is ready.",
"alert": "⚠️ Alert: at "
}
flow("multicast://") \
.destinations([
f"email://send?template={template}",
f"sms://send?template={template}"
])
# Track delivery status
result = flow("sms://send").run(message)
status = flow("sms://status").run(result['message_id'])
# Verify webhook signatures
flow("webhook://path=/telegram") \
| "verify://signature?secret=" \
| "telegram-webhook://"
import os
# Store sensitive data in environment
flow(f"telegram://send"
f"?token={os.getenv('TELEGRAM_BOT_TOKEN')}"
f"&chat_id={os.getenv('TELEGRAM_CHAT_ID')}"
)
# config.yaml
communication:
email:
smtp_host: smtp.gmail.com
smtp_port: 587
user: ${EMAIL_USER}
password: ${EMAIL_PASSWORD}
telegram:
bot_token: ${TELEGRAM_TOKEN}
twilio:
account_sid: ${TWILIO_SID}
auth_token: ${TWILIO_TOKEN}
# Reuse connections for better performance
email_pool = flow("email://pool?max_connections=10")
for message in messages:
email_pool.send(message)
from unittest.mock import patch
@patch('requests.post')
def test_telegram_send(mock_post):
mock_post.return_value.json.return_value = {"ok": True}
result = flow("telegram://send?chat_id=123").run("Test")
assert result["ok"] == True
# Test with real services (use test accounts)
def test_multi_channel_integration():
test_message = "Integration test at " + str(datetime.now())
results = flow("multicast://").destinations([
"slack://send?channel=test",
"email://send?to=test@example.com"
]).run(test_message)
assert all(r["success"] for r in results)
For issues and questions: