Three official SDKs maintained by the Integration Platform team. All feature automatic retry, full type coverage, and pagination helpers.
from integration_platform import IntegrationPlatform, from_env # Option 1: Explicit config client = IntegrationPlatform( api_key="ip_live_...", tenant_id="acme-corp", base_url="https://api.integration-platform.io" ) # Option 2: From environment variables # export IP_API_KEY=ip_live_... # export IP_TENANT_ID=acme-corp client = from_env() # Health check health = client.health() print(health["status"]) # UP
from integration_platform import IntegrationPlatform client = IntegrationPlatform(api_key="ip_live_...", tenant_id="acme") # Create SFTP connector sftp = client.connectors.create( name="SFTP Production", type="SFTP", config={"host": "sftp.empresa.pt", "port": 22, "remotePath": "/outbound"}, credentials={"username": "integrations", "password": "***"} ) print(f"Created: {sftp.id}") # Test connectivity tested = client.connectors.test(sftp.id) print(f"Status: {tested.status} — {tested.last_test_result}") # Paginate all connectors for conn in client.connectors.iter_all(): print(f" {conn.name} [{conn.type}] {conn.status}")
# Create integration + promote to production flow = client.integrations.create( name="SFTP → Salesforce Daily Sync", source_connector_id=sftp.id, target_connector_id="salesforce-prod-id", transformation_id="transform-uuid", error_strategy="RETRY_3_DLQ", max_throughput=100, tags=["sftp", "salesforce", "daily"] ) print(f"Created in {flow.environment}: {flow.id}") # Promote DEV → QA → PROD qa = client.integrations.promote(flow.id, "QA", notes="Initial QA test") print(f"QA: {qa.environment}") prod = client.integrations.promote(flow.id, "PROD", notes="Approved by ops") print(f"PROD: {prod.environment}") # Sandbox test before promoting result = client.sandbox.execute(flow.id, { "nome_completo": "Test User", "email": "test@example.pt" }) print(f"Sandbox: {result.status} in {result.duration_ms}ms") print(result.output_payload)
# Monitor DLQ stats = client.dlq.stats() print(f"Pending: {stats['pendingCount']}") # List PENDING entries pending = client.dlq.list(status="PENDING") for entry in pending: print(f" {entry.integration_name}: {entry.error_message}") # Bulk reprocess all pending ids = [e.id for e in pending] result = client.dlq.bulk_reprocess(ids) print(f"Reprocessed: {result['processed']} / {len(ids)}") # Discard with reason client.dlq.discard("entry-uuid", reason="Schema changed — data no longer valid")
import asyncio from integration_platform.async_client import AsyncIntegrationPlatform async def main(): client = AsyncIntegrationPlatform( api_key="ip_live_...", tenant_id="acme" ) # Parallel operations connectors, integrations, dlq_stats = await asyncio.gather( client.connectors.list(), client.integrations.list(), client.dlq.stats() ) print(f"Connectors: {connectors.total_elements}") print(f"Integrations: {integrations.total_elements}") print(f"DLQ pending: {dlq_stats['pendingCount']}") # Async generator pagination async for conn in client.connectors.iter_all(): print(f" {conn.name}") asyncio.run(main())
| Feature | Python SDK |
|---|---|
| Python version | 3.9+ |
| Async (asyncio) | ✓ Full support |
| Auto-retry | ✓ Exponential backoff |
| Typed responses (dataclasses) | ✓ |
| Pagination helpers | ✓ iter_all() generator |
| Streaming large payloads | ✓ |
| Dependencies | requests (sync) / httpx (async) |
import { IntegrationPlatform, fromEnv } from '@integration-platform/sdk'; // Explicit config const client = new IntegrationPlatform({ apiKey: 'ip_live_...', tenantId: 'acme-corp', baseUrl: 'https://api.integration-platform.io', timeout: 30_000, maxRetries: 3, }); // From env: IP_API_KEY, IP_TENANT_ID, IP_BASE_URL const client2 = fromEnv(); // Health check const { status } = await client.health(); console.log(status); // 'UP'
import { IntegrationPlatform, ConnectorType } from '@integration-platform/sdk'; const sftp = await client.connectors.create({ name: 'SFTP Production', type: 'SFTP', config: { host: 'sftp.empresa.pt', port: 22, remotePath: '/outbound' }, credentials: { username: 'integrations', password: '***' }, }); console.log(sftp.id, sftp.status); // Test connectivity const tested = await client.connectors.test(sftp.id); console.log(`Status: ${tested.status} — ${tested.lastTestResult}`); // Iterate all connectors (async generator) for await (const conn of client.connectors.iterAll()) { console.log(`${conn.name} [${conn.type}] ${conn.status}`); }
// Create and test integration const flow = await client.integrations.create({ name: 'SFTP → Salesforce Sync', sourceConnectorId: sftp.id, targetConnectorId: 'salesforce-prod-id', errorStrategy: 'RETRY_3_DLQ', tags: ['sftp', 'salesforce'], }); // Sandbox test const result = await client.sandbox.execute({ integrationId: flow.id, payload: { nome_completo: 'Test User', email: 'test@ex.pt' }, }); if (result.success) { // Promote to QA then PROD await client.integrations.promote(flow.id, 'QA'); await client.integrations.promote(flow.id, 'PROD', 'All tests passed'); console.log('Deployed to PROD!'); }
// Express webhook handler with signature verification import express from 'express'; import { verifyWebhookSignature } from '@integration-platform/sdk'; const app = express(); app.use(express.raw({ type: 'application/json' })); app.post('/webhook', (req, res) => { const signature = req.headers['x-ip-signature'] as string; const isValid = verifyWebhookSignature({ payload: req.body, signature, secret: process.env.IP_WEBHOOK_SECRET!, }); if (!isValid) return res.sendStatus(401); const event = JSON.parse(req.body); console.log(`Event: ${event.type}`); // dlq.entry.created, integration.failed res.sendStatus(200); });
import com.integrationplatform.sdk.IntegrationPlatformClient; // Builder initialisation IntegrationPlatformClient client = IntegrationPlatformClient .builder() .apiKey("ip_live_...") .tenantId("acme-corp") .baseUrl("https://api.integration-platform.io") .timeout(Duration.ofSeconds(30)) .build(); // From environment variables: IP_API_KEY, IP_TENANT_ID IntegrationPlatformClient client2 = IntegrationPlatformClient.fromEnv(); // Health check Map<String, Object> health = client.health(); System.out.println(health.get("status")); // UP
// Create connector with builder ConnectorDefinition sftp = client.connectors().create( CreateConnectorRequest.builder() .name("SFTP Production") .type(ConnectorType.SFTP) .config(Map.of( "host", "sftp.empresa.pt", "port", 22, "remotePath", "/outbound" )) .credentials(Map.of( "username", "integrations", "password", "***" )) .build() ); System.out.println("Created: " + sftp.id()); // Test and sandbox ConnectorDefinition tested = client.connectors().test(sftp.id()); System.out.println("Status: " + tested.status()); SandboxResult result = client.sandbox().execute( flow.id(), Map.of("email", "test@ex.pt", "ativo", true) ); System.out.println(result.success() ? "PASS" : "FAIL");
# application.yml integration-platform: api-key: ${IP_API_KEY} tenant-id: ${IP_TENANT_ID} base-url: https://api.integration-platform.io timeout: 30s // Auto-configured bean — just inject it: @Service @RequiredArgsConstructor public class MyIntegrationService { private final IntegrationPlatformClient ipClient; public void syncContacts() { SandboxResult result = ipClient.sandbox().execute( "integration-uuid", Map.of("email", "user@example.pt") ); if (!result.success()) { log.warn("Sync failed: {}", result.validationErrors()); } } }