Connector
A Connector in Nstream AI enables seamless integration between streaming data sources and sink. It supports streaming technologies like Kafka, ensuring efficient data flow and processing.
Sink Connector
A Kafka-based sink connector configuration for writing data into a specific topic with SSL authentication.
apiVersion: streams.nstream.ai/v1
kind: StreamConnector
metadata:
name: "NAME"
namespace: "NAMESPACE"
spec:
streamConnectorTemplate:
kafka:
connectorConfig:
groupId: "groupId"
keyFormat: "string"
valueFormat: "JSON"
offsetResetStrategy: "EARLIEST"
connectorParams:
bootstrapUrl: "URL"
topic: "TOPIC"
securityProtocol: "SASL_SSL"
saslMechanism: "PLAIN"
saslSslConfig:
jksKeystoreFile: "/path/to/keystore.jks"
jksKeystorePassword: "keystore-password"
jksTruststoreFile: "/path/to/truststore.jks"
jksTruststorePassword: "truststore-password"
| Key | Description | Example |
|---|---|---|
apiVersion | Defines the API version for connector deployment | streams.nstream.ai/v1 |
kind | Specifies the type of connector being deployed | StreamConnector |
name | The unique name of the connector instance | demo-sink-connector |
groupId | Consumer group ID for Kafka | demo-sink-group |
keyFormat | Format of the key | string |
valueFormat | Format of the value | JSON |
offsetResetStrategy | Offset reset strategy | EARLIEST |
bootstrapUrl | Kafka bootstrap server URL | ns-data-system-kafka-bootstrap.kafka.svc.cluster.local:9092 |
topic | Kafka topic name | ticker_demo_sink |
securityProtocol | Security protocol used | SASL_SSL |
saslMechanism | SASL mechanism used | PLAIN |
jksKeystoreFile | Path to Keystore file | /path/to/keystore.jks |
jksKeystorePassword | Password for Keystore file | keystore-password |
jksTruststoreFile | Path to Truststore file | /path/to/truststore.jks |
jksTruststorePassword | Password for Truststore file | truststore-password |
Source Connector
A Kafka-based source connector configuration for ingesting data into Nstream AI with schema definition.
apiVersion: streams.nstream.ai/v1
kind: StreamConnector
metadata:
name: "NAME"
namespace: "NAMESPACE"
spec:
streamConnectorType: Kafka
streamConnectorSchema:
schemaId: "schemaId"
definition:
- key: "KEY"
dataType: "DATATYPE"
nullable: false
watermarkColumn: "KEY"
groupingColumn: ["KEY"]
watermarkDuration: 5
primaryKey: "PRIMARYKEY"
streamConnectorTemplate:
kafka:
connectorConfig:
groupId: "groupId"
keyFormat: "string"
valueFormat: "json"
offsetResetStrategy: "LATEST"
offsetStartValue:
partition: 0
offset: 0
connectorParams:
bootstrapUrl: "URL"
topic: "TOPIC"
securityProtocol: "NONE"
| Key | Description | Example |
|---|---|---|
apiVersion | Defines the API version for connector deployment | streams.nstream.ai/v1 |
kind | Specifies the type of connector being deployed | StreamConnector |
name | The unique name of the connector instance | event-source-connector |
namespace | Namespace where the connector is deployed | nstream |
streamConnectorType | Type of streaming connector | Kafka |
schemaId | ID of the schema used | example-schema-id |
definition.key | Schema field key | event_data |
definition.dataType | Data type of schema field | STRING |
watermarkColumn | Column used for watermarking | datetime |
groupingColumn | Column used for grouping | [event_data] |
watermarkDuration | Watermark interval duration | 5 |
primaryKey | Primary key field | event_data |
groupId | Consumer group ID for Kafka | event-source-connector-consumer-group |
keyFormat | Format of the key | string |
valueFormat | Format of the value | json |
offsetResetStrategy | Offset reset strategy | LATEST |
offsetStartValue.partition | Partition to start reading from | 0 |
offsetStartValue.offset | Offset value to start from | 0 |
bootstrapUrl | Kafka bootstrap server URL | ns-data-system-kafka-bootstrap.kafka.svc.cluster.local:9092 |
topic | Kafka topic name | ticker_demo_node_1 |
securityProtocol | Security protocol used | NONE |