kafkaStream
This object requires Streaming Data Framework for MATLAB® Production Server™.
Description
The kafkaStream
function creates a
KafkaStream
object that connects to a Kafka® topic and reads and writes event streams from that topic.
An event consists of three parts:
Key — Identifies event source
Timestamp — Indicates time at which event occurred
Body — Contains event data specified as an unordered set of (name, value) pairs
After creating a KafkaStream
object, use the readtimetable
function to read the events into a timetable or the writetimetable
function to write a timetable to the stream.
readtimetable
converts events into rows of a timetable. The names in
the event body become the timetable column names, the value associated with each name becomes
the column value in the event row, and the event timestamp becomes the row timestamp.
writetimetable
converts rows of a timetable into events in a
stream.
Creation
Syntax
Description
Row-Based Event Window
Duration-Based Event Window
ks = kafkaStream(
creates a host
,port
,topic
,Duration=timespan
)KafkaStream
object that reads stream events occurring during
the specified timestamp span timespan
.
Additional Options
ks = kafkaStream(___,
specifies event stream options using one or more name-value
arguments. You can also set properties
using name-value arguments. You can use these name-value arguments and properties to
specify how events are converted to and from timetables.Name=Value
)
Input Arguments
numevents
— Number of events in event window
50 (default) | positive integer
Number of events in the event window, specified as a positive integer.
Rows=
specifies the number of
rows that a call to the numevents
readtimetable
function returns. If less than the number of specified
rows are available for reading, then readtimetable
times out and
returns an empty timetable.
readtimetable
does not return until it processes all events
in the window, so windows with large row values can block other processes from
continuing. To configure a timeout period to prevent blocking, use the
ReadLimit
property.
Example: Rows=500
specifies that each call to
readtimetable
returns a timetable with 500 rows.
Data Types: single
| double
| int8
| int16
| int32
| int64
| uint8
| uint16
| uint32
| uint64
timespan
— Timestamp span in event window
0 (default) | duration scalar
Timestamp span in the event window, specified as a duration scalar.
Duration=
determines the events
that the timespan
readtimetable
function returns based on their timestamp.
timespan
specifies the difference between the last and first
timestamps of events in the event window.
readtimetable
does not return until it processes all events
in the window, so windows with large durations can block other processes from
continuing. To configure a timeout period to prevent blocking, use the
ReadLimit
property.
Example: Duration=minutes(1)
specifies that each call to
readtimetable
returns a timetable that has one minute's worth of
events, where the timestamp of the last event is no more than one minute later than
the timestamp of the first event.
Data Types: duration
propname
— Name of Kafka provider property
character vector | string scalar
Name of a Kafka provider property, specified as a character vector or string scalar. Use
single or double quotes around propname
. Kafka property names always contain at least one dot character, for example,
retention.ms
. For a list of Kafka properties, see the Kafka documentation: https://kafka.apache.org/documentation/#configuration
.
The value of the property, propval
,
must follow the property name. Specify the property name and its corresponding value as
a comma-separated pair.
Example: kafkaStream(host,port,topic,"security.protocol","SASL_SSL")
sets the Kafka configuration property security.protocol
to
SASL_SSL
.
propval
— Value of Kafka provider property
any supported MATLAB data type
Value of a Kafka provider property. For a list of Kafka properties and their values, see the Kafka documentation: https://kafka.apache.org/documentation/#configuration
.
The value of the property must follow the property name propname
. Specify the property name and its corresponding value as a
comma-separated pair. You can specify propval
as any supported
MATLAB data type, but it must be possible to convert that value to a
string.
Example: kafkaStream(host,port,topic,"sasl.mechanism","SCRAM-SHA-512")
sets the value of the Kafka configuration property sasl.mechanism
to
SCRAM-SHA-512
.
Specify optional pairs of arguments as
Name1=Value1,...,NameN=ValueN
, where Name
is
the argument name and Value
is the corresponding value.
Name-value arguments must appear after other arguments, but the order of the
pairs does not matter.
GracePeriod
— Length of time to wait for messages
0
(default) | real scalar | duration scalar
Length of time, in GraceUnit
units, to wait for messages in the requested event window,
specified as a real scalar or duration scalar. The KafkaStream
object waits until the end of the grace period to return events read from the
stream. The GracePeriod
and GraceUnit
arguments together set the GracePeriod
property.
This argument applies only for objects with duration-based event windows, that
is, KafkaStream
objects created using the
timespan
argument. For objects created using the
numevents
argument, the grace period is ignored.
Example: 10
Example: minutes(10)
Data Types: single
| double
| int8
| int16
| int32
| int64
| uint8
| uint16
| uint32
| uint64
| duration
GraceUnit
— Unit of time for grace period
"Seconds"
(default) | "Milliseconds"
| "Minutes"
| "Hours"
| "Days"
Unit of time for the grace period specified by the GracePeriod
name-value argument, specified as one of these
values:
"Milliseconds"
"Seconds"
"Minutes"
"Hours"
"Days"
The GracePeriod
and GraceUnit
arguments together set the GracePeriod
property.
The KafkaStream
object converts
GracePeriod
duration scalars to the units specified by
GraceUnit
. For example, suppose you specify a two-minute
grace period using the minutes
function but set the units to
seconds. The GracePeriod
property displays the grace period in
seconds.
ks = kafkaStream(host,port,topic,Duration=minutes(10), ... GracePeriod=minutes(2),GraceUnit="Seconds")
ks = KafkaStream with properties: ... GracePeriod: "120 Seconds" ...
This argument applies only for objects with duration-based event windows, that
is, KafkaStream
objects created using the
timespan
argument. For objects created using the
numevents
argument, the grace period is ignored.
Data Types: string
| char
ImportSchema
— Rules for converting event data to MATLAB data types
JSON string in event schema format
Rules for converting event data to MATLAB data types, specified as a JSON string in event schema format. You can
specify an event schema more easily using the ImportOptions
property.
ExportSchema
— Rules for converting MATLAB data types to event data
JSON string in event schema format
Rules for converting MATLAB data types to event data, specified as a JSON string in event schema
format. You can specify an event schema more easily using the ExportOptions
property.
Properties
Host
— Hostname of Kafka server
character vector | string scalar
Hostname of the Kafka server, specified as a character vector or string scalar.
Example: '144.213.5.7'
or
'localhost'
Data Types: char
| string
Port
— Port number of Kafka server
integer in range [0, 65,535]
Port number of the Kafka server, specified as an integer in the range [0, 65,535].
Example: 9092
Topic
— Kafka topic name
character vector | string scalar
Kafka topic name, specified as a character vector or string scalar.
Example: "CoolingFan"
Data Types: char
| string
Group
— Kafka consumer group ID
UUID (default) | character vector | string scalar
Kafka consumer group ID, specified as a character vector or string scalar.
Multiple Kafka consumers can belong to the same consumer group. In that case, Kafka shares data between the consumers in the group so that no two consumers in
the same group ever receive the same messages. By default, every
kafkaStream
object has a unique consumer group ID, which allows
multiple consumers to read from the same topic independently.
Data Types: char
| string
Order
— Event order
"EventTime"
(default) | "IngestTime"
Strategy to order events in the stream, specified as one of these values:
"EventTime"
— Order events based on the time that they occur. Ensures event-time chronological order even when events arrive out of order at the Kafka server."IngestTime"
— Order events based on the time that they appear in the stream.
You cannot set the value of this property after object creation.
Data Types: string
| char
GracePeriod
— Time to wait for messages
"0 Seconds"
(default) | string scalar
This property is read-only.
Time that the KakfaStream
object waits for messages, specified as a
string scalar of the form "
, where:Length
Units
"
is the length of the grace period, as specified by theLength
GracePeriod
argument during object creation.
is the units of the grace period, as specified by theUnits
GraceUnits
argument during object creation.
When you create the object, if you do not specify a grace period, then the
GracePeriod
property is set to "0 Seconds"
(no
grace period).
Example: "10 Minutes"
Data Types: string
WindowSize
— Event window size
50 (default) | duration scalar | positive integer
This property is read-only.
Event window size, specified as a fixed amount of time (using the
timespan
argument) or a fixed number of
messages (using the numevents
argument).
Data Types: duration
| single
| double
| int8
| int16
| int32
| int64
| uint8
| uint16
| uint32
| uint64
ReadLimit
— Wait strategy
"Size"
(default) | "Time"
Strategy to wait for a response from the stream, specified as one of these values:
"Size"
— Client prioritizes filling the event window. Using this strategy, the client might wait longer than theRequestTimeout
time period as long as it is still receiving the expected number of messages. The default number of messages is 50. If the client receives no messages within theRequestTimeout
time period, it no longer waits."Time"
— Client strictly adheres to theRequestTimeout
limit, even if it has not received the expected number of messages.RequestTimeout
specifies the amount of time the stream object waits between receiving events. If the stream is actively receiving data, it does not time out during that operation.
TimestampResolution
— Unit of event timestamp
"Milliseconds"
(default) | "Seconds"
| "Minutes"
| "Hours"
| "Days"
Unit of event timestamp, specified as one of these values:
"Milliseconds"
"Seconds"
"Minutes"
"Hours"
"Days"
Interpret the event timestamp as the number of corresponding units before or after the UNIX® epoch.
Data Types: string
| char
Connection and Request Timeouts
ConnectionTimeout
— Number of seconds to wait for initial response from Kafka host
30 (default) | positive integer
Number of seconds that a client waits for the initial response from the Kafka host, specified as a positive integer.
Data Types: single
| double
| int8
| int16
| int32
| int64
| uint8
| uint16
| uint32
| uint64
RequestTimeout
— Number of seconds to wait before terminating request
61 (default) | positive integer
Number of seconds to wait before terminating a request, specified as a positive integer. The wait time includes connecting to the Kafka host as well as data transfer between the Kafka host and the client.
Data Types: single
| double
| int8
| int16
| int32
| int64
| uint8
| uint16
| uint32
| uint64
Import and Export Options
ImportOptions
— Rules for transforming stream events into MATLAB data
ImportOptions
object
Rules for transforming stream events into MATLAB data, specified as an ImportOptions
object. This object controls the import of stream events
into MATLAB.
ExportOptions
— Rules for transforming MATLAB data into stream events
ExportOptions
object
Rules for transforming MATLAB data into stream events, specified as an ExportOptions
object. This object controls the export of MATLAB data into streams.
PublishSchema
— Flag to indicate whether export schema is written to output stream
true
(default) | false
Flag to indicate whether the export schema is written to the output stream, specified as a logical scalar.
The schema is embedded in each event, which can significantly increase the size of the event. If downstream applications do not require the schema, set this flag to false to reduce the number of bytes in your stream.
Data Types: logical
Event Key and Body Encoding
KeyVariable
— Name of key variable
key
(default) | string scalar | character vector
Name of the key variable in the event stream, specified as a string scalar or character vector.
Data Types: string
| char
KeyEncoding
— Character encoding format for bits in event key
utf16
(default) | utf8
| base64
| uint8
Character encoding format used to interpret the bits in an event key, specified as one of the following:
utf8
— UTF-8 encoding formatutf16
— UTF-16 encoding formatbase64
— Base 64 encoding formatuint8
— Eight-bit unsigned binary bytes
If KeyEncoding
is utf8
or utf16
,
then the KeyType
property must be text
. If
KeyEncoding
is base64
or
uint8
, then KeyType
must be one of the numeric
encoding formats.
KeyType
— Character encoding scheme for bytes in event key
utf16
(default) | int8
| uint8
| int16
| uint16
| int32
| uint32
| int64
| uint64
| single
| double
| text
Character encoding scheme used to interpret the bytes in an event key, specified as one of these values:
uint8
— One-byte unsigned integerint8
— One-byte signed integeruint16
— Two-byte unsigned integerint16
— Two-byte signed integeruint32
— Four-byte unsigned integerint32
— Four-byte signed integeruint64
— Eight-byte unsigned integerint64
— Eight-byte signed integersingle
— Single-precision IEEE 754 floating point numberdouble
— Double-precision IEEE 754 floating point numbertext
— String
If KeyType
is text
, then the KeyEncoding
property must be either utf8
or utf16
. If KeyType
is any of the other numeric encoding formats, then KeyEncoding
must be either base64
or uint8
.
KeyByteOrder
— Order for storing bits in event key
BigEndian
(default) | LittleEndian
| MatchHost
| NotApplicable
Order for storing bits in the event key, specified as one of the following.
LittleEndian
— Least significant bit is stored firstBigEndian
— Most significant bit is stored firstMatchHost
— Bits are stored in the same order as is used by the host computer on which the streaming data framework is runningNotApplicable
— Not an integer key
This property is applicable only for integer keys and not applicable to floating point or text keys.
BodyEncoding
— Character encoding format for bits in event body
utf8
(default) | uint8
| utf16
| base64
Character encoding format used to interpret the bits in the event body, specified as one of the following:
utf8
— UTF-8 encoding formatutf16
— UTF-16 encoding formatbase64
— Base 64 encoding formatuint8
— Eight-bit unsigned binary bytes
This property determines the size and encoding of the bytes used in the event body,
which are in the format specified by BodyFormat
.
BodyFormat
— Format of bytes in event body
JSON
(default) | Array
| Text
| Binary
Format of bytes in event body, specified as one of the following:
JSON
— JSON stringArray
— MATLAB arrayText
— String dataBinary
— Binary data
Depending on the encoding specified by BodyEncoding
,
bytes can be larger than eight bits.
Object Functions
Import and Export
readtimetable | Read timetable from event stream |
writetimetable | Write timetable to event stream |
seek | Set read position in event stream |
preview | Preview subset of events from event stream |
identifyingName | Event stream name |
detectImportOptions | Create import options based on event stream content |
detectExportOptions | Create export options based on event stream content |
Kafka Stream Operations
readevents | Read raw events from Kafka stream without schema processing applied |
flush | Reset read window boundaries |
stop | Stop processing event streams from Kafka topic |
loggederror | Error information for Kafka stream operation |
createTopic | Create topic in Kafka cluster |
deleteTopic | Remove topic from Kafka cluster |
Kafka Provider Properties
categoryList | Kafka stream provider property list |
getProviderProperties | Kafka stream configuration property data |
setProviderProperties | Set properties specific to Kafka configuration |
isProperty | Determine if Kafka stream provider property is set |
Examples
Set Kafka Security Protocol
Assume that you have a Kafka server running at the network address kafka.host.com:9092
that has a topic CoolingFan
.
Assume that the Kafka host is configured to use SSL. To configure SSL communication between the Kafka host and the client, provide SSL configuration settings when creating an object for reading and writing to the Kafka topic.
ks = kafkaStream("kafka.host.com",9092,"CoolingFan", ... "security.protocol","SASL_SSL", ... "ssl.truststore.type","PEM", ... "ssl.truststore.location","prodserver.pem")
ks = KafkaStream with properties: Topic: "CoolingFan" Group: "da576775-49c9-4de3-9955-2bdd9f963aa0" Order: EventTime Host: "kafka.host.com" Port: 9092 ConnectionTimeout: 30 RequestTimeout: 61 ImportOptions: "Import to MATLAB types" ExportOptions: "Source: function eventSchema" PublishSchema: "true" WindowSize: 50 KeyVariable: "key" KeyEncoding: "utf16" KeyType: "text" KeyByteOrder: "BigEndian" BodyEncoding: "utf8" BodyFormat: "JSON" ReadLimit: "Size" TimestampResolution: "Milliseconds"
Confirm which properties are set.
props = getProviderProperties(ks); unique({props.name}')
ans = 7×1 cell array {'auto.offset.reset' } {'retention.ms' } {'sasl.jaas.config' } {'sasl.username' } {'security.protocol' } {'ssl.truststore.location'} {'ssl.truststore.type' }
Read Specific Number of Kafka Messages
Assume that you have a Kafka server running at the network address kafka.host.com:9092
that has a topic CoolingFan
.
Create an object connected to the CoolingFan
topic and request only
10 messages instead of the default.
ks = kafkaStream("kafka.host.com",9092,"CoolingFan",Rows=10)
ks = KafkaStream with properties: Topic: "CoolingFan" Group: "da576775-49c9-4de3-9955-2bdd9f963aa0" Order: EventTime Host: "kafka.host.com" Port: 9092 ConnectionTimeout: 30 RequestTimeout: 61 ImportOptions: "Import to MATLAB types" ExportOptions: "Source: function eventSchema" PublishSchema: "true" WindowSize: 10 KeyVariable: "key" KeyEncoding: "utf16" KeyType: "text" KeyByteOrder: "BigEndian" BodyEncoding: "utf8" BodyFormat: "JSON" ReadLimit: "Size" TimestampResolution: "Milliseconds"
Use the object to read 10 messages from the event stream into a timetable.
tt = readtimetable(ks)
tt =
10×11 timetable
timestamp vMotor wMotor Tmass
____________________ ______ ______ ______
31-Oct-2020 00:00:00 1.0909 0 25
31-Oct-2020 00:00:00 1.1506 100.5 25.17
31-Oct-2020 00:00:00 1.1739 190.9 25.223
31-Oct-2020 00:00:00 1.1454 330.61 25.15
31-Oct-2020 00:00:00 1.1346 382.77 25.122
31-Oct-2020 00:00:00 1.1287 420.88 25.106
31-Oct-2020 00:00:00 1.1253 454.55 25.096
31-Oct-2020 00:00:00 1.1232 478.1 25.09
31-Oct-2020 00:00:00 1.1217 500.16 25.086 ...
Version History
Introduced in R2022b
MATLAB Command
You clicked a link that corresponds to this MATLAB command:
Run the command by entering it in the MATLAB Command Window. Web browsers do not support MATLAB commands.
Select a Web Site
Choose a web site to get translated content where available and see local events and offers. Based on your location, we recommend that you select: .
You can also select a web site from the following list
How to Get Best Site Performance
Select the China site (in Chinese or English) for best site performance. Other MathWorks country sites are not optimized for visits from your location.
Americas
- América Latina (Español)
- Canada (English)
- United States (English)
Europe
- Belgium (English)
- Denmark (English)
- Deutschland (Deutsch)
- España (Español)
- Finland (English)
- France (Français)
- Ireland (English)
- Italia (Italiano)
- Luxembourg (English)
- Netherlands (English)
- Norway (English)
- Österreich (Deutsch)
- Portugal (English)
- Sweden (English)
- Switzerland
- United Kingdom (English)
Asia Pacific
- Australia (English)
- India (English)
- New Zealand (English)
- 中国
- 日本Japanese (日本語)
- 한국Korean (한국어)