Documentation
¶
Index ¶
- Constants
- Variables
- func GetSerdes(schemaType srclient.SchemaType) (Serdes, *Xk6KafkaError)
- type AvroSerde
- type BalancerKeyFunc
- type BasicAuth
- type ByteArraySerde
- type ConnectionConfig
- type ConsumeConfig
- type Container
- type Duration
- type Element
- type JKS
- type JKSConfig
- type JSONSerde
- type Kafka
- type Message
- type Module
- type ProduceConfig
- type ReaderConfig
- type RootModule
- type SASLConfig
- type Schema
- type SchemaRegistryConfig
- type Serdes
- type StringSerde
- type SubjectNameConfig
- type TLSConfig
- type WireFormat
- type WriterConfig
- type Xk6KafkaError
- func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError)
- func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError)
- func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError)
- func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError
Constants ¶
const ( Key Element = "key" Value Element = "value" MagicPrefixSize int = 5 ConcurrentRequests int = 16 )
const ( TopicNameStrategy string = "TopicNameStrategy" RecordNameStrategy string = "RecordNameStrategy" TopicRecordNameStrategy string = "TopicRecordNameStrategy" )
const (
Bytes srclient.SchemaType = "BYTES"
)
const (
String srclient.SchemaType = "STRING"
)
const (
Timeout = time.Second * 10
)
Variables ¶
var ( // ErrCannotConvertToByte is returned when a value cannot be converted to byte. ErrCannotConvertToByte = errors.New("cannot convert value to byte") // ErrCannotConvertToInt32 is returned when a float64 cannot be converted to int32. ErrCannotConvertToInt32 = errors.New("cannot convert float64 to int32: not an integer") // ErrCannotConvertToInt64 is returned when a float64 cannot be converted to int64. ErrCannotConvertToInt64 = errors.New("cannot convert float64 to int64: not an integer") )
var ( // ErrUnsupportedOperation is the error returned when the operation is not supported. ErrUnsupportedOperation = NewXk6KafkaError(unsupportedOperation, "Operation not supported", nil) // ErrForbiddenInInitContext is used when a Kafka producer was used in the init context. ErrForbiddenInInitContext = NewXk6KafkaError( kafkaForbiddenInInitContext, "Producing Kafka messages in the init context is not supported", nil) // ErrInvalidDataType is used when a data type is not supported. ErrInvalidDataType = NewXk6KafkaError( invalidDataType, "Invalid data type provided for serializer/deserializer", nil) // ErrInvalidSchema is used when a schema is not supported or is malformed. ErrInvalidSchema = NewXk6KafkaError(failedUnmarshalSchema, "Failed to unmarshal schema", nil) // ErrFailedTypeCast is used when a type cast failed. ErrFailedTypeCast = NewXk6KafkaError(failedTypeCast, "Failed to cast type", nil) // ErrUnknownSerdesType is used when a serdes type is not supported. ErrUnknownSerdesType = NewXk6KafkaError(invalidSerdeType, "Unknown serdes type", nil) ErrPartitionAndGroupID = NewXk6KafkaError( partitionAndGroupID, "Partition and groupID cannot be set at the same time", nil) ErrTopicAndGroupID = NewXk6KafkaError( topicAndGroupID, "When you specify groupID, you must set groupTopics instead of topic", nil) // ErrNotEnoughArguments is used when a function is called with too few arguments. ErrNotEnoughArguments = errors.New("not enough arguments") // ErrNoSchemaRegistryClient is used when a schema registry client is not configured correctly. ErrNoSchemaRegistryClient = NewXk6KafkaError( failedConfigureSchemaRegistryClient, "Failed to configure the schema registry client", nil) // ErrNoJKSConfig is used when a JKS config is not configured correctly. ErrNoJKSConfig = NewXk6KafkaError(failedConfigureJKS, "Failed to configure JKS", nil) // ErrInvalidPEMData is used when PEM data is invalid. ErrInvalidPEMData = errors.New("tls: failed to find any PEM data in certificate input") // ErrInvalidDuration is used when duration parsing fails. ErrInvalidDuration = errors.New("invalid duration") // ErrAvroMissingRequiredField is used when Avro encoding fails due to missing required field. ErrAvroMissingRequiredField = errors.New("avro: missing required field key") // ErrReferenceNotFound is returned when a schema reference cannot be found. ErrReferenceNotFound = errors.New("reference not found in registry") // ErrFailedParseReferencedSchema is returned when parsing a referenced schema fails. ErrFailedParseReferencedSchema = errors.New("failed to parse referenced schema: returned nil") // ErrFailedResolveReferences is returned when resolving schema references fails. ErrFailedResolveReferences = errors.New("failed to resolve references") )
var ( GroupBalancers map[string]kafkago.GroupBalancer IsolationLevels map[string]kafkago.IsolationLevel // StartOffsets determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If // non-zero, it must be set to one of FirstOffset or LastOffset. // // Default: FirstOffset // // Only used when GroupID is set // Ref: https://github.com/segmentio/kafka-go/blob/a8e5eabf4a90025a4ad2c28e929324d18db21103/reader.go#L481-L488 StartOffsets map[string]int64 RebalanceTimeout = time.Second * 5 HeartbeatInterval = time.Second * 3 SessionTimeout = time.Second * 30 PartitionWatchInterval = time.Second * 5 JoinGroupBackoff = time.Second * 5 RetentionTime = time.Hour * 24 )
var ( // CompressionCodecs is a map of compression codec names to their respective codecs. CompressionCodecs map[string]compress.Compression // Balancers is a map of balancer names to their respective balancers. Balancers map[string]kafkago.Balancer )
var TLSVersions map[string]uint16
TLSVersions is a map of TLS versions to their numeric values.
var TypesRegistry map[srclient.SchemaType]Serdes = map[srclient.SchemaType]Serdes{ String: &StringSerde{}, Bytes: &ByteArraySerde{}, srclient.Json: &JSONSerde{}, srclient.Avro: &AvroSerde{}, }
Functions ¶
func GetSerdes ¶ added in v0.14.0
func GetSerdes(schemaType srclient.SchemaType) (Serdes, *Xk6KafkaError)
Types ¶
type AvroSerde ¶ added in v0.14.0
type AvroSerde struct {
Serdes
}
func (*AvroSerde) Deserialize ¶ added in v0.14.0
func (*AvroSerde) Deserialize(data []byte, schema *Schema) (any, *Xk6KafkaError)
Deserialize deserializes a Avro binary into a JSON object.
type BalancerKeyFunc ¶ added in v1.2.0
type ByteArraySerde ¶ added in v0.14.0
type ByteArraySerde struct {
Serdes
}
func (*ByteArraySerde) Deserialize ¶ added in v0.14.0
func (*ByteArraySerde) Deserialize(data []byte, _ *Schema) (any, *Xk6KafkaError)
Deserialize returns the data as-is, because it is already a byte array.
func (*ByteArraySerde) Serialize ¶ added in v0.14.0
func (*ByteArraySerde) Serialize(data any, _ *Schema) ([]byte, *Xk6KafkaError)
Serialize serializes the given data into a byte array.
type ConnectionConfig ¶ added in v0.12.0
type ConnectionConfig struct {
Address string `json:"address"`
SASL SASLConfig `json:"sasl"`
TLS TLSConfig `json:"tls"`
}
type ConsumeConfig ¶ added in v0.12.0
type Container ¶ added in v0.14.0
type Container struct {
Data any `json:"data"`
Schema *Schema `json:"schema"`
SchemaType srclient.SchemaType `json:"schemaType"`
}
type Duration ¶ added in v0.20.0
func (Duration) MarshalJSON ¶ added in v0.20.0
func (*Duration) UnmarshalJSON ¶ added in v0.20.0
type JSONSerde ¶ added in v0.14.0
type JSONSerde struct {
Serdes
}
func (*JSONSerde) Deserialize ¶ added in v0.14.0
func (*JSONSerde) Deserialize(data []byte, schema *Schema) (any, *Xk6KafkaError)
Deserialize deserializes a map from bytes to be exported as object to JS.
type Message ¶ added in v0.12.0
type Message struct {
Topic string `json:"topic"`
// Setting Partition has no effect when writing messages.
Partition int `json:"partition"`
Offset int64 `json:"offset"`
HighWaterMark int64 `json:"highWaterMark"`
Key []byte `json:"key"`
Value []byte `json:"value"`
Headers map[string]any `json:"headers"`
// If not set at the creation, Time will be automatically set when
// writing the message.
Time time.Time `json:"time"`
}
type ProduceConfig ¶ added in v0.12.0
type ProduceConfig struct {
Messages []Message `json:"messages"`
}
type ReaderConfig ¶ added in v0.12.0
type ReaderConfig struct {
WatchPartitionChanges bool `json:"watchPartitionChanges"`
ConnectLogger bool `json:"connectLogger"`
Partition int `json:"partition"`
QueueCapacity int `json:"queueCapacity"`
MinBytes int `json:"minBytes"`
MaxBytes int `json:"maxBytes"`
MaxAttempts int `json:"maxAttempts"`
GroupID string `json:"groupId"`
Topic string `json:"topic"`
IsolationLevel string `json:"isolationLevel"`
StartOffset string `json:"startOffset"`
Offset int64 `json:"offset"`
Brokers []string `json:"brokers"`
GroupTopics []string `json:"groupTopics"`
GroupBalancers []string `json:"groupBalancers"`
MaxWait Duration `json:"maxWait"`
ReadBatchTimeout time.Duration `json:"readBatchTimeout"`
ReadLagInterval time.Duration `json:"readLagInterval"`
HeartbeatInterval time.Duration `json:"heartbeatInterval"`
CommitInterval time.Duration `json:"commitInterval"`
PartitionWatchInterval time.Duration `json:"partitionWatchInterval"`
SessionTimeout time.Duration `json:"sessionTimeout"`
RebalanceTimeout time.Duration `json:"rebalanceTimeout"`
JoinGroupBackoff time.Duration `json:"joinGroupBackoff"`
RetentionTime time.Duration `json:"retentionTime"`
ReadBackoffMin time.Duration `json:"readBackoffMin"`
ReadBackoffMax time.Duration `json:"readBackoffMax"`
OffsetOutOfRangeError bool `json:"offsetOutOfRangeError"` // deprecated, do not use
SASL SASLConfig `json:"sasl"`
TLS TLSConfig `json:"tls"`
}
type RootModule ¶ added in v0.9.0
type RootModule struct{}
func (*RootModule) NewModuleInstance ¶ added in v0.9.0
func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance
NewModuleInstance creates a new instance of the Kafka module.
type SASLConfig ¶ added in v0.11.0
type Schema ¶ added in v0.14.0
type Schema struct {
EnableCaching bool `json:"enableCaching"`
ID int `json:"id"`
Schema string `json:"schema"`
SchemaType *srclient.SchemaType `json:"schemaType"`
Version int `json:"version"`
References []srclient.Reference `json:"references"`
Subject string `json:"subject"`
// contains filtered or unexported fields
}
Schema is a wrapper around the schema registry schema. The Codec() and JsonSchema() methods will return the respective codecs (duck-typing).
func (*Schema) Codec ¶ added in v0.14.0
func (s *Schema) Codec() avro.Schema
Codec ensures access to parsed Avro Schema. Will try to initialize a new one if it hasn't been initialized before. Will return nil if it can't initialize a schema from the schema string.
func (*Schema) JSONSchema ¶ added in v1.2.0
func (s *Schema) JSONSchema() *jsonschema.Schema
JSONSchema ensures access to JsonSchema. Will try to initialize a new one if it hasn't been initialized before. Will return nil if it can't initialize a json schema from the schema.
type SchemaRegistryConfig ¶ added in v0.14.0
type Serdes ¶ added in v0.14.0
type Serdes interface {
Serialize(data any, schema *Schema) ([]byte, *Xk6KafkaError)
Deserialize(data []byte, schema *Schema) (any, *Xk6KafkaError)
}
type StringSerde ¶ added in v0.14.0
type StringSerde struct {
Serdes
}
func (*StringSerde) Deserialize ¶ added in v0.14.0
func (*StringSerde) Deserialize(data []byte, _ *Schema) (any, *Xk6KafkaError)
Deserialize deserializes a string from bytes.
func (*StringSerde) Serialize ¶ added in v0.14.0
func (*StringSerde) Serialize(data any, _ *Schema) ([]byte, *Xk6KafkaError)
Serialize serializes a string to bytes.
type SubjectNameConfig ¶ added in v0.14.0
type WireFormat ¶ added in v0.14.0
type WriterConfig ¶ added in v0.12.0
type WriterConfig struct {
AutoCreateTopic bool `mapstructure:"autoCreateTopic"`
ConnectLogger bool `mapstructure:"connectLogger"`
MaxAttempts int `mapstructure:"maxAttempts"`
BatchSize int `mapstructure:"batchSize"`
BatchBytes int `mapstructure:"batchBytes"`
RequiredAcks int `mapstructure:"requiredAcks"`
Topic string `mapstructure:"topic"`
Balancer string `mapstructure:"-"`
BalancerFunc BalancerKeyFunc `mapstructure:"-"`
Compression string `mapstructure:"compression"`
Brokers []string `mapstructure:"brokers"`
BatchTimeout time.Duration `mapstructure:"batchTimeout"`
ReadTimeout time.Duration `mapstructure:"readTimeout"`
WriteTimeout time.Duration `mapstructure:"writeTimeout"`
SASL SASLConfig `mapstructure:"sasl"`
TLS TLSConfig `mapstructure:"tls"`
}
func (*WriterConfig) GetBalancer ¶ added in v1.2.0
func (c *WriterConfig) GetBalancer() kafkago.Balancer
type Xk6KafkaError ¶ added in v0.10.0
func GetDialer ¶ added in v0.11.0
func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError)
GetDialer creates a kafka dialer from the given auth string or an unauthenticated dialer if the auth string is empty.
func GetSASLMechanism ¶ added in v0.11.0
func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError)
GetSASLMechanism returns a kafka SASL config from the given credentials.
func GetTLSConfig ¶ added in v0.10.0
func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError)
GetTLSConfig creates a TLS config from the given TLS config struct and checks for errors. nolint: funlen
func NewXk6KafkaError ¶ added in v0.10.0
func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError
NewXk6KafkaError is the constructor for Xk6KafkaError.
func (Xk6KafkaError) Error ¶ added in v0.10.0
func (e Xk6KafkaError) Error() string
Error implements the `error` interface, so Xk6KafkaError are normal Go errors.
func (Xk6KafkaError) Unwrap ¶ added in v0.10.0
func (e Xk6KafkaError) Unwrap() error
Unwrap implements the `xerrors.Wrapper` interface, so Xk6KafkaError are a bit future-proof Go 2 errors.
xk6-kafka