> ## Documentation Index
> Fetch the complete documentation index at: https://upstash-fix-issues-on-docs.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka API

Upstash uses [Apache Kafka](https://kafka.apache.org/) for deployments and
provides a serverless Kafka cluster access using both native Kafka clients (over
TCP) and REST API (over HTTP). As a consequence of this flexible model, there
are some restrictions when using
[Kafka protocol](https://kafka.apache.org/protocol), mainly for administrative
Kafka APIs.

Currently following [Kafka Protocol APIs](https://kafka.apache.org/protocol) are
supported by Upstash:

| NAME            | KEY | NAME                 | KEY | NAME                    | KEY |
| --------------- | :-: | -------------------- | :-: | ----------------------- | :-: |
| Produce         |  0  | DescribeGroups       |  15 | EndTxn                  |  26 |
| Fetch           |  1  | ListGroups           |  16 | TxnOffsetCommit         |  28 |
| ListOffsets     |  2  | SaslHandshake        |  17 | DescribeConfigs         |  32 |
| Metadata        |  3  | ApiVersions          |  18 | AlterConfigs            |  33 |
| OffsetCommit    |  8  | CreateTopics         |  19 | DescribeLogDirs         |  35 |
| OffsetFetch     |  9  | DeleteTopics         |  20 | SaslAuthenticate        |  36 |
| FindCoordinator |  10 | DeleteRecords        |  21 | CreatePartitions        |  37 |
| JoinGroup       |  11 | InitProducerId       |  22 | DeleteGroups            |  42 |
| Heartbeat       |  12 | OffsetForLeaderEpoch |  23 | IncrementalAlterConfigs |  44 |
| LeaveGroup      |  13 | AddPartitionsToTxn   |  24 | OffsetDelete            |  47 |
| SyncGroup       |  14 | AddOffsetsToTxn      |  25 | DescribeCluster         |  60 |

<Note>
  Some of the unsupported Kafka APIs are in our roadmap to make them available.
  If you need an API that we do not support at the moment, please drop a note to
  [support@upstash.com](mailto:support@upstash.com). So we can inform you when
  we are planning to support it.
</Note>

## Connect Using Kafka Clients

Connecting to Upstash Kafka using any Kafka client is very straightforward. If
you do not have a Kafka cluster and/or topic already, follow
[these steps](../overall/getstarted) to create one.

After creating a cluster and a topic, just go to cluster details page on the
[Upstash Console](https://console.upstash.com) and copy bootstrap endpoint,
username and password.

<Frame>
  <img src="https://mintlify.s3-us-west-1.amazonaws.com/upstash-fix-issues-on-docs/img/kafka/getting_started/cluster-detail.png" />
</Frame>

Then replace following parameters in the code snippets of your favourite Kafka
client or language below.

* `{{ BOOTSTRAP_ENDPOINT }}`
* `{{ UPSTASH_KAFKA_USERNAME }}`
* `{{ UPSTASH_KAFKA_PASSWORD }}`
* `{{ TOPIC_NAME }}`

## Create a Topic

<Tabs>
  <Tab title="Java">
    ```java
    class CreateTopic {
      public static void main(String[] args) throws Exception {
        var props = new Properties();
        props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                    "username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
                    "password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");

        try (var admin = Admin.create(props)) {
            admin.createTopics(
                Set.of(new NewTopic("{{ TOPIC_NAME }}", partitions, replicationFactor))
              ).all().get();
        }
      }
    }
    ```
  </Tab>

  <Tab title="Node.js">
    ```typescript
    const { Kafka } = require("kafkajs");

    const kafka = new Kafka({
      brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
      sasl: {
        mechanism: "scram-sha-512",
        username: "{{ UPSTASH_KAFKA_USERNAME }}",
        password: "{{ UPSTASH_KAFKA_PASSWORD }}",
      },
      ssl: true,
    });

    const admin = kafka.admin();

    const createTopic = async () => {
      await admin.connect();
      await admin.createTopics({
        validateOnly: false,
        waitForLeaders: true,
        topics: [
          {
            topic: "{{ TOPIC_NAME }}",
            numPartitions: partitions,
            replicationFactor: replicationFactor,
          },
        ],
      });
      await admin.disconnect();
    };
    createTopic();
    ```
  </Tab>

  <Tab title="Python">
    ```py
    from kafka import KafkaAdminClient
    from kafka.admin import NewTopic

    admin = KafkaAdminClient(
      bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
      sasl_mechanism='SCRAM-SHA-512',
      security_protocol='SASL_SSL',
      sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
      sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
    )

    admin.create_topics([NewTopic(name='{{ TOPIC_NAME }}', num_partitions=partitions, replication_factor=replicationFactor)])
    admin.close()
    ```
  </Tab>

  <Tab title="Go">
    ```go
    import (
            "context"
            "crypto/tls"
            "log"

            "github.com/segmentio/kafka-go"
            "github.com/segmentio/kafka-go/sasl/scram"
    )

    func main() {
      mechanism, err := scram.Mechanism(scram.SHA512,
          "{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
      if err != nil {
        log.Fatalln(err)
      }

      dialer := &kafka.Dialer{
        SASLMechanism: mechanism,
        TLS:           &tls.Config{},
      }

      conn, err := dialer.Dial("tcp", "{{ BOOTSTRAP_ENDPOINT }}")
      if err != nil {
          log.Fatalln(err)
      }
      defer conn.Close()

      controller, err := conn.Controller()
      if err != nil {
          log.Fatalln(err)
      }

      controllerConn, err := dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
      if err != nil {
          log.Fatalln(err)
      }
      defer controllerConn.Close()

      err = controllerConn.CreateTopics(kafka.TopicConfig{
                    Topic:             "{{ TOPIC_NAME }}",
                    NumPartitions:     partitions,
                    ReplicationFactor: replicationFactor,
            })
      if err != nil {
          log.Fatalln(err)
      }
    }
    ```
  </Tab>
</Tabs>

## Produce a Message

<Tabs groupId="kafka-clients">
  <Tab title="Java">
    ```java
    class Produce {
      public static void main(String[] args) throws Exception {
        var props = new Properties();
        props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                    "username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
                    "password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (var producer = new KafkaProducer<String, String>(props)) {
            producer.send(new ProducerRecord<>("{{ TOPIC_NAME }}", "Hello Upstash!"));
            producer.flush();
        }
      }
    }
    ```
  </Tab>

  <Tab title="Node.js">
    ```typescript
    const { Kafka } = require("kafkajs");

    const kafka = new Kafka({
      brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
      sasl: {
        mechanism: "scram-sha-512",
        username: "{{ UPSTASH_KAFKA_USERNAME }}",
        password: "{{ UPSTASH_KAFKA_PASSWORD }}",
      },
      ssl: true,
    });

    const producer = kafka.producer();

    const produce = async () => {
      await producer.connect();
      await producer.send({
        topic: "{{ TOPIC_NAME }}",
        messages: [{ value: "Hello Upstash!" }],
      });
      await producer.disconnect();
    };
    produce();
    ```
  </Tab>

  <Tab title="Python">
    ```py
    from kafka import KafkaProducer

    producer = KafkaProducer(
      bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
      sasl_mechanism='SCRAM-SHA-512',
      security_protocol='SASL_SSL',
      sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
      sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
    )

    future = producer.send('{{ TOPIC_NAME }}', b'Hello Upstash!')
    record_metadata = future.get(timeout=10)
    print (record_metadata)
    producer.close()
    ```
  </Tab>

  <Tab title="Go">
    ```go
    import (
            "context"
            "crypto/tls"
            "log"

            "github.com/segmentio/kafka-go"
            "github.com/segmentio/kafka-go/sasl/scram"
    )

    func main() {
      mechanism, err := scram.Mechanism(scram.SHA512,
          "{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
      if err != nil {
        log.Fatalln(err)
      }

      dialer := &kafka.Dialer{
        SASLMechanism: mechanism,
        TLS:           &tls.Config{},
      }

      w := kafka.NewWriter(kafka.WriterConfig{
        Brokers:  []string{"{{ BOOTSTRAP_ENDPOINT }}"},
        Topic:    "{{ TOPIC_NAME }}",
        Dialer:   dialer,
      })
      defer w.Close()

      err = w.WriteMessages(context.Background(),
        kafka.Message{
            Value: []byte("Hello Upstash!"),
        },
      )
      if err != nil {
        log.Fatalln("failed to write messages:", err)
      }
    }
    ```
  </Tab>
</Tabs>

## Consume Messages

<Tabs groupId="kafka-clients">
  <Tab title="Java">
    ```java
    class Consume {
      public static void main(String[] args) throws Exception {
        var props = new Properties();
        props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                    "username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
                    "password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", "{{ GROUP_NAME }}");

        try(var consumer = new KafkaConsumer<String, String>(props)) {
            consumer.subscribe(Collections.singleton("{{ TOPIC_NAME }}"));
            var records = consumer.poll(Duration.ofSeconds(10));
            for (var record : records) {
                System.out.println(record);
            }
        }
      }
    }
    ```
  </Tab>

  <Tab title="Node.js">
    ```typescript
    const { Kafka } = require("kafkajs");

    const kafka = new Kafka({
      brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
      sasl: {
        mechanism: "scram-sha-512",
        username: "{{ UPSTASH_KAFKA_USERNAME }}",
        password: "{{ UPSTASH_KAFKA_PASSWORD }}",
      },
      ssl: true,
    });

    const consumer = kafka.consumer({ groupId: "{{ GROUP_NAME }}" });
    const consume = async () => {
      await consumer.connect();
      await consumer.subscribe({ topic: "{{ TOPIC_NAME }}", fromBeginning: true });

      await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          console.log({
            topic: topic,
            partition: partition,
            message: JSON.stringify(message),
          });
        },
      });
    };
    consume();
    ```
  </Tab>

  <Tab title="Python">
    ```py
    from kafka import KafkaConsumer

    consumer = KafkaConsumer(
      bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
      sasl_mechanism='SCRAM-SHA-512',
      security_protocol='SASL_SSL',
      sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
      sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
      group_id='{{ GROUP_NAME }}',
      auto_offset_reset='earliest',
    )

    consumer.subscribe(['{{ TOPIC_NAME }}'])
    records = consumer.poll(timeout_ms=10000)
    print(records)
    consumer.close()
    ```
  </Tab>

  <Tab title="Go">
    ```go
    import (
            "context"
            "crypto/tls"
            "log"
            "time"

            "github.com/segmentio/kafka-go"
            "github.com/segmentio/kafka-go/sasl/scram"
    )

    func main() {
      mechanism, err := scram.Mechanism(scram.SHA512,
          "{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
      if err != nil {
        log.Fatalln(err)
      }

      dialer := &kafka.Dialer{
        SASLMechanism: mechanism,
        TLS:           &tls.Config{},
      }

      r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{"{{ BOOTSTRAP_ENDPOINT }}"},
        GroupID: "{{ GROUP_NAME }}",
        Topic:   "{{ TOPIC_NAME }}",
        Dialer:  dialer,
      })
      defer r.Close()

      ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
      defer cancel()

      m, err := r.ReadMessage(ctx)
      if err != nil {
        log.Fatalln(err)
      }
      log.Printf("%+v\n", m)
    }
    ```
  </Tab>
</Tabs>
