> ## Documentation Index
> Fetch the complete documentation index at: https://upstash-fix-issues-on-docs.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Create Kafka Connector

> This endpoint creates a new kafka connector in a cluster.

## Request Parameters

<ParamField body="name" type="string" required>
  Name of the new kafka topic
</ParamField>

<ParamField body="cluster_id" type="string" required>
  ID of the cluster the topic will be deployed in
</ParamField>

<ParamField body="properties" type="Object" required>
  Properties of the connector. Custom config for different types of connectors.
</ParamField>

## Response Parameters

<ResponseField name="connector_id" type="string">
  ID of the new kafka connector
</ResponseField>

<ResponseField name="name" type="string">
  Name of the new kafka connector
</ResponseField>

<ResponseField name="customer_id" type="string">
  Owner of the connector
</ResponseField>

<ResponseField name="cluster_id" type="string">
  ID of the kafka cluster of the connector
</ResponseField>

<ResponseField name="creation_time" type="int">
  Creation time of the topic
</ResponseField>

<ResponseField name="deletion_time" type="int">
  Creation time of the topic
</ResponseField>

<ResponseField name="state" type="string">
  State of the connector
</ResponseField>

<ResponseField name="state_error_message" type="string">
  Error message, if the connector failed
</ResponseField>

<ResponseField name="connector_state" type="string">
  State of the connector
</ResponseField>

<ResponseField name="tasks" type="object">
  Tasks for the connector
</ResponseField>

<ResponseField name="topics" type="string">
  Topics that are given with properties config
</ResponseField>

<ResponseField name="connector_class" type="string">
  Class of the created connector
</ResponseField>

<ResponseField name="encoded_username" type="string">
  Encoded username for the connector
</ResponseField>

<ResponseField name="TTL" type="int">
  Time to live for connector
</ResponseField>

<RequestExample>
  ```shell curl
  curl -X POST \
    https://api.upstash.com/v2/kafka/connector \
    -u 'EMAIL:API_KEY' \
    -d '{"name":"connectorName","cluster_id":"7568431c-88d5-4409-a808-2167f22a7133", "properties":{"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "connection-uri"}}'
  ```

  ```python Python
  import requests

  data = '{"name":"connectorName","cluster_id":"7568431c-88d5-4409-a808-2167f22a7133", "properties":{"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "connection-uri"}}'

  response = requests.post('https://api.upstash.com/v2/kafka/connector', data=data, auth=('EMAIL', 'API_KEY'))
  response.content
  ```

  ```go Go
  client := &http.Client{}
  var data = strings.NewReader(`{
      "name": "connectorName",
      "cluster_id": "7568431c-88d5-4409-a808-2167f22a7133",
      "properties":{"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "connection-uri"}
  }`)
  req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/connector", data)
  if err != nil {
      log.Fatal(err)
  }
  req.SetBasicAuth("email", "api_key")
  resp, err := client.Do(req)
  if err != nil {
      log.Fatal(err)
  }
  bodyText, err := ioutil.ReadAll(resp.Body)
  if err != nil {
      log.Fatal(err)
  }
  fmt.Printf("%s\n", bodyText);
  ```
</RequestExample>

<ResponseExample>
  ```json 200 OK
  {
    "connector_id":"431ec970-b59d-4b00-95fe-5f3abcc52c2f",
    "name":"connectorName",
    "customer_id":"EMAIL",
    "cluster_id":"7568431c-88d5-4409-a808-2167f22a7133",
    "creation_time":1684369147,
    "deletion_time":0,
    "state":"pending",
    "state_error_message":"",
    "connector_state":"",
    "tasks":[],
    "topics":[],
    "connector_class":"com.mongodb.kafka.connect.MongoSourceConnector",
    "encoded_username":"YXBwYXJlbnQta2l0ZS0xMTMwMiTIqFhTItzgDdE56au6LgnnbtlN7ITzh4QATDw",
    "TTL":1684370947
  }
  ```
</ResponseExample>
