kafkaStream
This object requires Streaming Data Framework for MATLAB® Production Server™.
Description
The kafkaStream function creates a
KafkaStream object that connects to a Kafka® topic and reads and writes event streams from that topic.
An event consists of three parts:
Key — Identifies event source
Timestamp — Indicates time at which event occurred
Body — Contains event data specified as an unordered set of (name, value) pairs
After creating a KafkaStream object, use the readtimetable
function to read the events into a timetable or the writetimetable
function to write a timetable to the stream.
readtimetable converts events into rows of a timetable. The names in
the event body become the timetable column names, the value associated with each name becomes
the column value in the event row, and the event timestamp becomes the row timestamp.
writetimetable converts rows of a timetable into events in a
stream.
Creation
Syntax
Description
Row-Based Event Window
Duration-Based Event Window
ks = kafkaStream(
creates a host,port,topic,Duration=timespan)KafkaStream object that reads stream events occurring during
the specified timestamp span timespan.
Additional Options
ks = kafkaStream(___,
specifies event stream options using one or more name-value
arguments. You can also set properties
using name-value arguments. You can use these name-value arguments and properties to
specify how events are converted to and from timetables.Name=Value)
Input Arguments
Number of events in the event window, specified as a positive integer.
Rows= specifies the number of
rows that a call to the numeventsreadtimetable function returns. If less than the number of specified
rows are available for reading, then readtimetable times out and
returns an empty timetable.
readtimetable does not return until it processes all events
in the window, so windows with large row values can block other processes from
continuing. To configure a timeout period to prevent blocking, use the
ReadLimit property.
Example: Rows=500 specifies that each call to
readtimetable returns a timetable with 500 rows.
Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64
Timestamp span in the event window, specified as a duration scalar.
Duration= determines the events
that the timespanreadtimetable function returns based on their timestamp.
timespan specifies the difference between the last and first
timestamps of events in the event window.
readtimetable does not return until it processes all events
in the window, so windows with large durations can block other processes from
continuing. To configure a timeout period to prevent blocking, use the
ReadLimit property.
Example: Duration=minutes(1) specifies that each call to
readtimetable returns a timetable that has one minute's worth of
events, where the timestamp of the last event is no more than one minute later than
the timestamp of the first event.
Data Types: duration
Name of a Kafka provider property, specified as a character vector or string scalar. Use
single or double quotes around propname. Kafka property names always contain at least one dot character, for example,
retention.ms. For a list of Kafka properties, see the Kafka documentation: https://kafka.apache.org/documentation/#configuration.
The value of the property, propval,
must follow the property name. Specify the property name and its corresponding value as
a comma-separated pair.
Example: kafkaStream(host,port,topic,"security.protocol","SASL_SSL")
sets the Kafka configuration property security.protocol to
SASL_SSL.
Value of a Kafka provider property. For a list of Kafka properties and their values, see the Kafka documentation: https://kafka.apache.org/documentation/#configuration.
The value of the property must follow the property name propname. Specify the property name and its corresponding value as a
comma-separated pair. You can specify propval as any supported
MATLAB data type, but it must be possible to convert that value to a
string.
Example: kafkaStream(host,port,topic,"sasl.mechanism","SCRAM-SHA-512")
sets the value of the Kafka configuration property sasl.mechanism to
SCRAM-SHA-512.
Name-Value Arguments
Specify optional pairs of arguments as
Name1=Value1,...,NameN=ValueN, where Name is
the argument name and Value is the corresponding value.
Name-value arguments must appear after other arguments, but the order of the
pairs does not matter.
Grace Period
Length of time, in GraceUnit units, to wait for messages in the requested event window,
specified as a real scalar or duration scalar. The KafkaStream
object waits until the end of the grace period to return events read from the
stream. The GracePeriod and GraceUnit
arguments together set the GracePeriod property.
This argument applies only for objects with duration-based event windows, that
is, KafkaStream objects created using the
timespan argument. For objects created using the
numevents argument, the grace period is ignored.
Example: 10
Example: minutes(10)
Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64 | duration
Unit of time for the grace period specified by the GracePeriod name-value argument, specified as one of these
values:
"Milliseconds""Seconds""Minutes""Hours""Days"
The GracePeriod and GraceUnit
arguments together set the GracePeriod property.
The KafkaStream object converts
GracePeriod duration scalars to the units specified by
GraceUnit. For example, suppose you specify a two-minute
grace period using the minutes function but set the units to
seconds. The GracePeriod property displays the grace period in
seconds.
ks = kafkaStream(host,port,topic,Duration=minutes(10), ... GracePeriod=minutes(2),GraceUnit="Seconds")
ks =
KafkaStream with properties:
...
GracePeriod: "120 Seconds"
...This argument applies only for objects with duration-based event windows, that
is, KafkaStream objects created using the
timespan argument. For objects created using the
numevents argument, the grace period is ignored.
Data Types: string | char
Schema
Rules for converting event data to MATLAB data types, specified as a JSON string in event schema format. You can
specify an event schema more easily using the ImportOptions property.
Rules for converting MATLAB data types to event data, specified as a JSON string in event schema
format. You can specify an event schema more easily using the ExportOptions property.
Properties
Hostname of the Kafka server, specified as a character vector or string scalar.
Example: '144.213.5.7' or
'localhost'
Data Types: char | string
Port number of the Kafka server, specified as an integer in the range [0, 65,535].
Example: 9092
Kafka topic name, specified as a character vector or string scalar.
Example: "CoolingFan"
Data Types: char | string
Kafka consumer group ID, specified as a character vector or string scalar.
Multiple Kafka consumers can belong to the same consumer group. In that case, Kafka shares data between the consumers in the group so that no two consumers in
the same group ever receive the same messages. By default, every
kafkaStream object has a unique consumer group ID, which allows
multiple consumers to read from the same topic independently.
Data Types: char | string
Strategy to order events in the stream, specified as one of these values:
"EventTime"— Order events based on the time that they occur. Ensures event-time chronological order even when events arrive out of order at the Kafka server."IngestTime"— Order events based on the time that they appear in the stream.
You cannot set the value of this property after object creation.
Data Types: string | char
This property is read-only.
Time that the KakfaStream object waits for messages, specified as a
string scalar of the form ", where:Length
Units"
is the length of the grace period, as specified by theLengthGracePeriodargument during object creation.is the units of the grace period, as specified by theUnitsGraceUnitsargument during object creation.
When you create the object, if you do not specify a grace period, then the
GracePeriod property is set to "0 Seconds" (no
grace period).
Example: "10 Minutes"
Data Types: string
This property is read-only.
Event window size, specified as a fixed amount of time (using the
timespan argument) or a fixed number of
messages (using the numevents argument).
Data Types: duration | single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64
Strategy to wait for a response from the stream, specified as one of these values:
"Size"— Client prioritizes filling the event window. Using this strategy, the client might wait longer than theRequestTimeouttime period as long as it is still receiving the expected number of messages. The default number of messages is 50. If the client receives no messages within theRequestTimeouttime period, it no longer waits."Time"— Client strictly adheres to theRequestTimeoutlimit, even if it has not received the expected number of messages.RequestTimeoutspecifies the amount of time the stream object waits between receiving events. If the stream is actively receiving data, it does not time out during that operation.
Unit of event timestamp, specified as one of these values:
"Milliseconds""Seconds""Minutes""Hours""Days"
Interpret the event timestamp as the number of corresponding units before or after the UNIX® epoch.
Data Types: string | char
Connection and Request Timeouts
Number of seconds that a client waits for the initial response from the Kafka host, specified as a positive integer.
Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64
Number of seconds to wait before terminating a request, specified as a positive integer. The wait time includes connecting to the Kafka host as well as data transfer between the Kafka host and the client.
Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64
Import and Export Options
Rules for transforming stream events into MATLAB data, specified as an ImportOptions object. This object controls the import of stream events
into MATLAB.
Rules for transforming MATLAB data into stream events, specified as an ExportOptions object. This object controls the export of MATLAB data into streams.
Flag to indicate whether the export schema is written to the output stream, specified as a logical scalar.
The schema is embedded in each event, which can significantly increase the size of the event. If downstream applications do not require the schema, set this flag to false to reduce the number of bytes in your stream.
Data Types: logical
Event Key and Body Encoding
Name of the key variable in the event stream, specified as a string scalar or character vector.
Data Types: string | char
Character encoding format used to interpret the bits in an event key, specified as one of the following:
utf8— UTF-8 encoding formatutf16— UTF-16 encoding formatbase64— Base 64 encoding formatuint8— Eight-bit unsigned binary bytes
If KeyEncoding is utf8 or utf16,
then the KeyType property must be text. If
KeyEncoding is base64 or
uint8, then KeyType must be one of the numeric
encoding formats.
Character encoding scheme used to interpret the bytes in an event key, specified as one of these values:
uint8— One-byte unsigned integerint8— One-byte signed integeruint16— Two-byte unsigned integerint16— Two-byte signed integeruint32— Four-byte unsigned integerint32— Four-byte signed integeruint64— Eight-byte unsigned integerint64— Eight-byte signed integersingle— Single-precision IEEE 754 floating point numberdouble— Double-precision IEEE 754 floating point numbertext— String
If KeyType is text, then the KeyEncoding property must be either utf8 or utf16. If KeyType is any of the other numeric encoding formats, then KeyEncoding must be either base64 or uint8.
Order for storing bits in the event key, specified as one of the following.
LittleEndian— Least significant bit is stored firstBigEndian— Most significant bit is stored firstMatchHost— Bits are stored in the same order as is used by the host computer on which the streaming data framework is runningNotApplicable— Not an integer key
This property is applicable only for integer keys and not applicable to floating point or text keys.
Character encoding format used to interpret the bits in the event body, specified as one of the following:
utf8— UTF-8 encoding formatutf16— UTF-16 encoding formatbase64— Base 64 encoding formatuint8— Eight-bit unsigned binary bytes
This property determines the size and encoding of the bytes used in the event body,
which are in the format specified by BodyFormat.
Format of bytes in event body, specified as one of the following:
JSON— JSON stringArray— MATLAB arrayText— String dataBinary— Binary data
Depending on the encoding specified by BodyEncoding,
bytes can be larger than eight bits.
Object Functions
readtimetable | Read timetable from event stream |
writetimetable | Write timetable to event stream |
seek | Set read position in event stream |
preview | Preview subset of events from event stream |
identifyingName | Event stream name |
detectImportOptions | Create import options based on event stream content |
detectExportOptions | Create export options based on event stream content |
readevents | Read raw events from Kafka stream without schema processing applied |
flush | Reset read window boundaries |
stop | Stop processing event streams from Kafka topic |
loggederror | Error information for Kafka stream operation |
createTopic | Create topic in Kafka cluster |
deleteTopic | Remove topic from Kafka cluster |
categoryList | Kafka stream provider property list |
getProviderProperties | Kafka stream configuration property data |
setProviderProperties | Set properties specific to Kafka configuration |
isProperty | Determine if Kafka stream provider property is set |
Examples
Assume that you have a Kafka server running at the network address kafka.host.com:9092
that has a topic CoolingFan.
Assume that the Kafka host is configured to use SSL. To configure SSL communication between the Kafka host and the client, provide SSL configuration settings when creating an object for reading and writing to the Kafka topic.
ks = kafkaStream("kafka.host.com",9092,"CoolingFan", ... "security.protocol","SASL_SSL", ... "ssl.truststore.type","PEM", ... "ssl.truststore.location","prodserver.pem")
ks =
KafkaStream with properties:
Topic: "CoolingFan"
Group: "da576775-49c9-4de3-9955-2bdd9f963aa0"
Order: EventTime
Host: "kafka.host.com"
Port: 9092
ConnectionTimeout: 30
RequestTimeout: 61
ImportOptions: "Import to MATLAB types"
ExportOptions: "Source: function eventSchema"
PublishSchema: "true"
WindowSize: 50
KeyVariable: "key"
KeyEncoding: "utf16"
KeyType: "text"
KeyByteOrder: "BigEndian"
BodyEncoding: "utf8"
BodyFormat: "JSON"
ReadLimit: "Size"
TimestampResolution: "Milliseconds"Confirm which properties are set.
props = getProviderProperties(ks);
unique({props.name}')ans =
7×1 cell array
{'auto.offset.reset' }
{'retention.ms' }
{'sasl.jaas.config' }
{'sasl.username' }
{'security.protocol' }
{'ssl.truststore.location'}
{'ssl.truststore.type' }Assume that you have a Kafka server running at the network address kafka.host.com:9092
that has a topic CoolingFan.
Create an object connected to the CoolingFan topic and request only
10 messages instead of the default.
ks = kafkaStream("kafka.host.com",9092,"CoolingFan",Rows=10)
ks =
KafkaStream with properties:
Topic: "CoolingFan"
Group: "da576775-49c9-4de3-9955-2bdd9f963aa0"
Order: EventTime
Host: "kafka.host.com"
Port: 9092
ConnectionTimeout: 30
RequestTimeout: 61
ImportOptions: "Import to MATLAB types"
ExportOptions: "Source: function eventSchema"
PublishSchema: "true"
WindowSize: 10
KeyVariable: "key"
KeyEncoding: "utf16"
KeyType: "text"
KeyByteOrder: "BigEndian"
BodyEncoding: "utf8"
BodyFormat: "JSON"
ReadLimit: "Size"
TimestampResolution: "Milliseconds"Use the object to read 10 messages from the event stream into a timetable.
tt = readtimetable(ks)
tt =
10×11 timetable
timestamp vMotor wMotor Tmass
____________________ ______ ______ ______
31-Oct-2020 00:00:00 1.0909 0 25
31-Oct-2020 00:00:00 1.1506 100.5 25.17
31-Oct-2020 00:00:00 1.1739 190.9 25.223
31-Oct-2020 00:00:00 1.1454 330.61 25.15
31-Oct-2020 00:00:00 1.1346 382.77 25.122
31-Oct-2020 00:00:00 1.1287 420.88 25.106
31-Oct-2020 00:00:00 1.1253 454.55 25.096
31-Oct-2020 00:00:00 1.1232 478.1 25.09
31-Oct-2020 00:00:00 1.1217 500.16 25.086 ... Version History
Introduced in R2022b
MATLAB Command
You clicked a link that corresponds to this MATLAB command:
Run the command by entering it in the MATLAB Command Window. Web browsers do not support MATLAB commands.
Select a Web Site
Choose a web site to get translated content where available and see local events and offers. Based on your location, we recommend that you select: .
You can also select a web site from the following list
How to Get Best Site Performance
Select the China site (in Chinese or English) for best site performance. Other MathWorks country sites are not optimized for visits from your location.
Americas
- América Latina (Español)
- Canada (English)
- United States (English)
Europe
- Belgium (English)
- Denmark (English)
- Deutschland (Deutsch)
- España (Español)
- Finland (English)
- France (Français)
- Ireland (English)
- Italia (Italiano)
- Luxembourg (English)
- Netherlands (English)
- Norway (English)
- Österreich (Deutsch)
- Portugal (English)
- Sweden (English)
- Switzerland
- United Kingdom (English)