How do I use the Kafka-Kinesis-Connector to connect to my Amazon MSK cluster?
When I try to use the Kafka-Kinesis-Connector to connect with Amazon Managed Streaming for Apache Kafka (Amazon MSK), I receive an error message.
Short description
Prerequisites:
- You have an active AWS subscription.
- You have a virtual private cloud (VPC) that's visible to both the client machine and MSK cluster. The MSK cluster and client must reside in the same VPC.
- You have connectivity to Amazon MSK and Apache Zookeeper servers.
- There are two subnets associated with your VPC.
- You created topics in MSK to send and receive messages from the server.
Resolution
Build your project file
- To download the Kafka-Kinesis-Connector, clone the kafka-kinesis-connector project from the GitHub website.
- To build the amazon-kinesis-kafka-connector-X.X.X.jar file in the target directory, run the mvn package command:
The Kafka-Kinesis-Connector looks for credentials in the following order: environment variables, java system properties, and the credentials profile file.[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package.. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
- To update your configuration to the DefaultAWSCredentailsProviderChain setting, run the following command:
The preceding command makes sure that the access key that's attached to the AWS Identity and Access Management (IAM) user has the minimum required permissions. The aws configure command also makes sure that there's an available policy to access Amazon Kinesis Data Streams or Amazon Kinesis Data Firehose. For more information about setting AWS credentials, see Provide temporary credentials to the AWS SDK for Java.[ec2-user@ip-10-0-0-71 target]$ aws configure
Note: If you use a Java Development Kit (JDK), then you can also use the EnvironmentVariableCredentialsProvider class to provide credentials. - If you use Kinesis Data Streams, then update your policy to look similar to the following example:
If you use Kinesis Data Firehose, then update your policy to look similar to the following example:{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
For more information about the Kinesis Data Firehose delivery stream settings, see Configuration and credential file settings.{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Configure the connector
Note: You can configure the Kafka-Kinesis-Connector to publish messages from MSK. You can publish messages to the following destinations: Amazon Simple Storage Service (Amazon S3), Amazon Redshift, or Amazon OpenSearch Service.
-
If you're setting up Kinesis Data Streams, then configure the connector with the following values:
name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
If you're setting up a different type of stream, then configure the Kinesis Data Firehose delivery stream properties with the following values:
name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
-
Configure the worker properties for either standalone or distributed mode:
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
For more information about Kafka-Kinesis-Connector's standalone or distributed mode, see Kafka Connect on the Apache website.
-
Copy the amazon-kinesis-kafka-connector-0.0.X.jar file to your directory, and export classpath.
Note: You can also add the amazon-kinesis-kafka-connector-0.0.X.jar file to the JAVA_HOME/lib/ext directory. -
To run the Kafka-Kinesis-Connector, use the following command syntax:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Related information
Relevant content
- asked 2 months agolg...
- asked 2 years agolg...
- asked a year agolg...
- Accepted Answerasked 2 years agolg...
- AWS OFFICIALUpdated a month ago
- AWS OFFICIALUpdated a year ago
- AWS OFFICIALUpdated 5 months ago