what are source and sink connectors ?
→ source connectors pull data from an external system ( the source ) and write it to kafka-topics
→ sink connectors read data from kafka topics and push it to external system ( the sink )
→ each connector is unidirectional, you can’t go against the flow.
now we will try to send from source file test.txt to sink file test.sink.txt
test.txt
foo
bar
source connector configuration
connect-file-source.properties
# name can be anything
name=local-file-source#kind of connector
connector.class=FileStreamSource# instance of source connector that should run in parallel
tasks.max=1# connector will read from this input source
file=D:\\LOGS\\test.txt# connector will send output to this topic
topic=connect-test
connect-file-sink.properties
# name can be anything
name=local-file-sinkconnector.class=FileStreamSinktasks.max=1file=D:\\LOGS\\test.sink.txttopic=connect-test
connect-standalone.properties
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=truevalue.converter.schemas.enable=true# important field
offset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000
start the connector standalone
> cd bin\windows
> .\connect-standalone.bat ..\..\config\connect-standalone.properties ..\..\config\connect-file-source.properties ..\..\config\connect-file-sink.properties
check the console consumer
> cd bin\windows
> .\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
check the consumer groups
> cd bin\windows
> .\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --listconnect-local-console-sink
connect-local-file-sink
describe consumer groups
> .\kafka-consumer-group.bat --bootstrap-server localhost:90922 --describe --group connect-local-console-sink
O/P:
lets understand the above output:partition = 0 # therefore one partitioncurrent-offset = 42 log-end-offset = 42 # if lag = n , then current-offset += nlag = 0 # 0 indicates there's nothing to process
to bring data from broker to consumer
lets understand the above imagepartition=0
current-offset=20 # notice that 20 and 42 are different
log-end-offset=42 # log-end-offset and current-offset don't match
lag=22 # 22 data is being processed
now lets add the contents to test.txt source file ( make sure to hit enter in the file )
Happy Learning….
thats’ all folks … take it easy