Skip to main content

Kafka

Connect your Apache Kafka clusters to enable Alex (Cloud Engineer) and Tony (Database Engineer) to monitor topic health, analyze consumer lag, and optimize streaming performance.

Supported Platforms

PlatformSupport
Confluent CloudAll tiers
Self-hosted Kafka2.8+ (KRaft mode), 3.x

Setup

Select your Kafka platform for specific connection instructions:
1

Open Confluent Cloud and pick your environment

Go to confluent.cloud/home, then open Environments.Click the environment you want to connect.The environment ID appears in the URL after you select it (for example, env-xxxxx).Example navigation:
  • Environment list: https://confluent.cloud/environments
  • Selected environment URL pattern: https://confluent.cloud/environments/<env-id>/overview
2

Get Kafka cluster fields

Inside the selected environment, open Clusters and click your target cluster (for example, <cluster-name>).Collect:
  • BOOTSTRAP_SERVERS
  • KAFKA_REST_ENDPOINT
  • KAFKA_CLUSTER_ID
Keep KAFKA_ENV_ID as the selected environment ID from Step 1.
3

Create scoped API keys and secrets

Go to confluent.cloud/settings/api-keys and click + Add API Key.Choose Service Account for production workloads, or My Account for development/testing.Select the desired scope in Confluent onboarding, then save the generated API key and API secret pair.Scopes you may create keys for:
  • Kafka cluster
  • Schema Registry
  • ksqlDB cluster
  • Flink region
  • Cloud resource management
  • Tableflow
4

Get Schema Registry endpoint (optional)

In the selected environment, open Stream Governance -> Schema Registry.Collect:
  • SCHEMA_REGISTRY_ENDPOINT
URL pattern example: https://confluent.cloud/environments/<env-id>/stream-governance/schema-registry/overview
5

Get Flink fields (optional)

In the selected environment, open Flink.Open Compute pools and create a pool with + Add compute pool if needed.Click the target compute pool and collect:
  • FLINK_COMPUTE_POOL_ID
  • FLINK_ENV_ID (same environment ID from URL)
URL pattern example: https://confluent.cloud/environments/<env-id>/flink/pools/<compute-pool-id>/overviewSet FLINK_REST_ENDPOINT from your cloud provider and region (AWS, Azure, or GCP; for example <region-code>).
6

Get organization ID (optional)

Go to confluent.cloud/settings/organizations/edit and collect:
  • FLINK_ORG_ID
7

Add connection in CloudThinker

In CloudThinker, navigate to Connections → Kafka.Create a JSON file with the fields for the scopes you enabled (see Connection Field Template below). Upload this JSON file in the connection form.Required fields depend on your profile — see Profiles for details.You can leave optional scope fields empty and add them later.

Scope-Based Credential Model

Confluent Cloud uses scope-based API credentials. Each API key and secret pair grants access to a specific resource scope.You can start with Kafka-only fields, then add Schema Registry, Flink, Cloud API, or Tableflow fields later.
ScopeWhat It UnlocksTypical Fields
Kafka clusterManage topics (list, create, delete, configure), produce/consume messages, view cluster metadataBOOTSTRAP_SERVERS, KAFKA_API_KEY, KAFKA_API_SECRET, KAFKA_CLUSTER_ID, KAFKA_ENV_ID, KAFKA_REST_ENDPOINT
Schema RegistryList, inspect, and delete data schemasSCHEMA_REGISTRY_ENDPOINT, SCHEMA_REGISTRY_API_KEY, SCHEMA_REGISTRY_API_SECRET
Flink regionCreate and manage Flink SQL statements, explore catalogs/databases/tables, health checks and diagnosticsFLINK_REST_ENDPOINT, FLINK_API_KEY, FLINK_API_SECRET, FLINK_COMPUTE_POOL_ID, FLINK_ENV_ID
Cloud resource managementDiscover environments and clusters, query operational metrics and billing costsCONFLUENT_CLOUD_API_KEY, CONFLUENT_CLOUD_API_SECRET
TableflowManage Tableflow-enabled topics and catalog integrations (e.g., AWS Glue)TABLEFLOW_API_KEY, TABLEFLOW_API_SECRET
Organization metadataOrganization-level context for Flink resource managementFLINK_ORG_ID

Profiles

Minimal (Kafka-only)

Required:
  • BOOTSTRAP_SERVERS
  • KAFKA_API_KEY
  • KAFKA_API_SECRET
  • KAFKA_CLUSTER_ID
  • KAFKA_ENV_ID
What you can do: Manage topics (list, create, delete, configure), produce and consume messages, view cluster metadata and topic configurations.

Standard (Kafka + Schema Registry + Cloud Management)

Add:
  • SCHEMA_REGISTRY_ENDPOINT
  • SCHEMA_REGISTRY_API_KEY
  • SCHEMA_REGISTRY_API_SECRET
  • CONFLUENT_CLOUD_API_KEY
  • CONFLUENT_CLOUD_API_SECRET
What you can do: Everything in Minimal, plus list and inspect data schemas, discover environments and clusters, query operational metrics, and view billing costs.Add one or more optional scope groups as needed:
  • Flink: FLINK_REST_ENDPOINT, FLINK_API_KEY, FLINK_API_SECRET, FLINK_COMPUTE_POOL_ID, FLINK_ENV_ID
  • Tableflow: TABLEFLOW_API_KEY, TABLEFLOW_API_SECRET
What you can do: Everything in Standard, plus create and manage Flink SQL statements, explore Flink catalogs and databases, run health checks on streaming queries, and manage Tableflow-enabled topics with catalog integrations (e.g., AWS Glue).

Connection Field Template

Use this template and fill values for your enabled scopes:
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx",

  "SCHEMA_REGISTRY_ENDPOINT": "https://psrc-xxxxx.<region>.<provider>.confluent.cloud",
  "SCHEMA_REGISTRY_API_KEY": "<schema-registry-api-key>",
  "SCHEMA_REGISTRY_API_SECRET": "<schema-registry-api-secret>",

  "FLINK_API_KEY": "<flink-api-key>",
  "FLINK_API_SECRET": "<flink-api-secret>",
  "FLINK_COMPUTE_POOL_ID": "lfcp-xxxxx",
  "FLINK_ENV_ID": "env-xxxxx",
  "FLINK_REST_ENDPOINT": "https://flink.<region>.<provider>.confluent.cloud",
  "FLINK_ORG_ID": "<org-id>",

  "CONFLUENT_CLOUD_API_KEY": "<cloud-api-key>",
  "CONFLUENT_CLOUD_API_SECRET": "<cloud-api-secret>",

  "TABLEFLOW_API_KEY": "<tableflow-api-key>",
  "TABLEFLOW_API_SECRET": "<tableflow-api-secret>"
}

Agent Capabilities

Once connected, Alex and Tony can:
CapabilityDescription
Consumer Lag MonitoringTrack lag per consumer group, identify slow consumers
Topic Health AnalysisCheck partition distribution, replication factor, under-replicated partitions
Throughput MetricsMonitor bytes in/out, message rates per topic
Broker HealthTrack broker availability, ISR (In-Sync Replicas) status

Example Prompts

@alex check consumer lag for the orders-service group
@alex identify under-replicated partitions
@tony analyze message throughput trends for the events topic
@tony check data retention policies across all topics

Troubleshooting

  • Verify the Kafka broker process is running on <broker-name>.<your-domain>.
  • Check that the broker port (default 9092) is open and not blocked by firewall.
  • Verify the bootstrap server address <broker-name>.<your-domain>:9092 is correct and reachable from CloudThinker.
  • For local development, ensure Kafka is bound to an accessible IP (not just 127.0.0.1).
When using partial scope onboarding, remove the entire key-value pair for unused scopes. Do not leave empty strings.Correct (Kafka-only, Schema Registry removed entirely):
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx"
}
Incorrect (empty string values cause validation errors):
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx",
  "SCHEMA_REGISTRY_ENDPOINT": "",
  "SCHEMA_REGISTRY_API_KEY": ""
}

Security Best Practices

For Confluent Cloud

  • Network restrictions - Restrict Kafka access to CloudThinker IPs via security groups.
  • Secure credentials - Store secrets in a secure manager and rotate keys regularly.

For Self-hosted Kafka

  • Network restrictions - Restrict broker access to CloudThinker IPs via firewalls.
  • Private networks - Keep brokers in private subnets, not exposed to the public internet.
CloudThinker supports partial scope onboarding. If you only provide Kafka scope fields first, you can still create the connection and add Schema Registry, Flink, Cloud API, or Tableflow credentials later.

Alex Agent

Cloud infrastructure and streaming optimization agent

AWS Connection

Setup instructions for AWS cloud resources