Enables developers to use AWS Identity and Access Management (IAM) to connect to their Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters.
This project is licensed under the Apache-2.0 License.
The Amazon MSK Library for AWS Identity and Access Management enables developers to use
AWS Identity and Access Management (IAM) to connect to their Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters.
It allows JVM based Apache Kafka clients to use AWS IAM for authentication and authorization against
Amazon MSK clusters that have AWS IAM enabled as an authentication mechanism.
This library provides a new Simple Authentication and Security Layer (SASL) mechanism called AWS_MSK_IAM
. This new
SASL mechanism can be used by Kafka clients to authenticate against Amazon MSK clusters using AWS IAM.
After you’ve downloaded the code from GitHub, you can build it using Gradle. Use this command:
gradle clean build
The generated jar files can be found at: build/libs/
.
An uber jar containing the library and all its relocated dependencies except the kafka client and slf4j-api
can
also be built. Use this command:
gradle clean shadowJar
The generated uber jar file can also be found at: build/libs/
. At runtime, the uber jar expects to find the kafka
client library and the sl4j-api
library on the classpath.
To ensure no security vulnerabilities in the dependency libraries, run the following.
gradle dependencyCheckAnalyze
If the above reports any vulnerabilities, upgrade dependencies to use the respective latest versions.
The recommended way to use this library is to consume it from maven central while building a Kafka client application.
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.2.0</version>
</dependency>
If you want to use it with a pre-existing Kafka client, you could build the uber jar and place it in the Kafka client’s
classpath.
You can configure a Kafka client to use AWS IAM for authentication by adding the following properties to the client’s
configuration.
# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL
# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM
# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
You can alternatively use SASL/OAUTHBEARER mechanism using IAM authentication by adding following configuration.
For more details on SASL/OAUTHBEARER mechanism, please read - KIP-255
# Sets up TLS for encryption and SASL for authN.
security.protocol=SASL_SSL
# Identifies the SASL mechanism to use.
sasl.mechanism=OAUTHBEARER
# Binds SASL client implementation. You can add client credential configurations here.
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.login.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler
# This is used during client authentication and reauthentication
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler
This configuration finds IAM credentials using the AWS Default Credentials Provider Chain. To summarize,
the Default Credential Provider Chain looks for credentials in this order:
If the client wants to specify a particular credential profile as part of the client configuration rather than through
the environment variable AWS_PROFILE, they can pass in the name of the profile as a client configuration property:
# Binds SASL client implementation. Uses the specified profile name to look for credentials.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName="<Credential Profile Name>";
Some clients may want to assume a role and use the role’s temporary credentials to communicate with a MSK
cluster. One way to do that is to create a credential profile for that role following the rules for
Using an IAM role in the CLI.
They can then pass in the name of the credential profile as described above.
As an example, let’s say a Kafka client is running on an Ec2 instance and the Kafka client wants to use an IAM role
called msk_client_role
. The Ec2 instance profile has permissions to assume the msk_client_role
IAM role although
msk_client_role
is not attached to the instance profile.
In such a case, we create a credential profile called msk_client
that assumes the role msk_client_role
.
The credential profile looks like:
[msk_client]
role_arn = arn:aws:iam::123456789012:role/msk_client_role
credential_source = Ec2InstanceMetadata
The credential profile name msk_client
is passed in as a client configuration property:
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName="msk_client";
Many more examples of configuring credential profiles with IAM roles can be found in Using an IAM role in the CLI.
The library supports another way to configure a client to assume an IAM role and use the role’s temporary credentials.
The IAM role’s ARN and optionally the session name for the client can be passed in as client configuration property:
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::123456789012:role/msk_client_role" awsRoleSessionName="producer" awsStsRegion="us-west-2";
In this case, the awsRoleArn
specifies the ARN for the IAM role the client should use and awsRoleSessionName
specifies the session name that this particular client should use while assuming the IAM role. If the same IAM
Role is used in multiple contexts, the session names can be used to differentiate between the different contexts.
The awsRoleSessionName
is optional.
awsStsRegion
optionally specifies the regional endpoint of AWS STS to use
while assuming the IAM role. If awsStsRegion
is omitted the global endpoint for AWS STS is used by default.
When the Kafka client is running in a VPC with an STS interface VPC Endpoint (AWS PrivateLink) to a regional endpoint of AWS STS and we want
all STS traffic to go over that endpoint, we should set awsStsRegion
to the region corresponding to the interface
VPC Endpoint. It may also be necessary to configure the sts_regional_endpoints
shared AWS config file setting, or
the AWS_STS_REGIONAL_ENDPOINTS environment variable as per the AWS STS Regionalized endpoints documentation.
The Default Credential Provider Chain must contain the permissions necessary to assume the client role.
For example, if the client is an EC2 instance, its instance profile should have permission to assume the
msk_client_role
.
When you want the MSK client to connect to MSK using credentials not found in the AWS Default Credentials Provider Chain, you can specify an awsProfileName
containing the credential profile to use, or an awsRoleArn
to indicate an IAM Role’s ARN to assume using credentials in the Default Credential Provider Chain. These parameters are optional, and if they are not set the MSK client will use credentials from the Default Credential Provider Chain. There is no need to specify them if you intend to use an IAM role associated with an AWS compute service, such as EC2 or ECS to authenticate to MSK.
In some scenarios the IAM credentials might be transiently unavailable. This will cause the connection to fail, which
might in some cases cause the client application to stop.
So, in version 1.1.3
the library retries loading the credentials when it gets an SdkClientException
(which wraps
most AWS SDK
client side exceptions). Since the retries do not impact the fault-free path and we had heard of user
issues around random failures loading credentials (e.g.: #59, maybe
#51 ), we decided to change the default behavior
to retry a maximum of 3
times. It exponentially backs off with full jitter upto a max-delay of 2000 ms
.
The maximum number of retries and the maximum back off period can be set:
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries="7" awsMaxBackOffTimeMs="500";
This sets the maximum number of retries to 7
and the maximum back off time to 500 ms
.
The retries can be turned off completely by setting awsMaxRetries
to "0"
.
If your Kafka Client, Producer or Consumer, is running on EKS, you can use EKS service accounts to distribute IAM credentials. The EKS service account documentation is a good place to start. Following steps cover the scenario of cross account IAM access with EKS service account. Our set-up uses VPC peering for cross account network access, and managed EC2 node group on EKS side. Each step below is linked to AWS documentation for more details and troubleshooting:
--with-oidc
flag to use AWS Identity and Access Management (IAM) roles for service accounts.With console access to your EKS containers, as in the EKS example. You can connect, and download the latest version of Kafka on the container. It comes with the kafka CLI, that you can use for validation. As an example, you can set-up your config file and use the following command to test topic creation with IAM auth.
./kafka-topics.sh --bootstrap-server <borker-name>:9098 --create --topic test-topic --partitions 1 --replication-factor 3 --command-config <config_file>
A Kafka client configured to use AWS_MSK_IAM
may see an error that the IAMClientCallbackHandler
cannot be found:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value
software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class:
Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.
That means that this aws-msk-iam-auth
library is not on the classpath of the Kafka client. Please add the aws-msk-iam-auth
library
to the classpath and try again.
You may receive an Access denied
error and there may be some doubt as to which credential is being exactly used. The
credential may be sourced from a role ARN, EC2 instance profile, credential profile etc. This may be particularly so
when cross account access is being attempted.
If the client side logging is set to DEBUG
and the client configuration property includes awsDebugCreds
set to
true
:
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;
the client library will print a debug log of the form:
DEBUG The identity of the credentials is {UserId: ARID4JIC6BCC6OK5OFDFGS:test124,Account: 12345678,Arn: arn:aws:sts::12345678:assumed-role/kada/test124} (software.amazon.msk.auth.iam.internals.MSKCredentialProvider)```
The log line provides the IAM Account, IAM user id and the ARN of the IAM Principal corresponding to the credential
being used.
The awsDebugCreds=true
parameter can be combined with any of the other parameters such as awsRoleArn
,
awsRoleSessionName
.
Please note that the log level should also be set to DEBUG
for this information to be logged.
It is not recommended to run with awsDebugCreds=true
since it makes an additional remote call.
You may receive an error, indicating that authentication has failed due to Too many connects
, similar to:
ERROR org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-x] Connection to node 3 (...) failed
authentication due to: [446c81dc-9ab3-4d4b-b174-4ecd9baa406c]: Too many connects
This is a sign that one or more IAM clients are trying to connect to a particular broker too many times per second and
the broker is protecting itself.
Setting the reconnect.backoff.ms
to at least 1000
should help clients backoff and retry
connections such that the broker does not need to reject new connections because of the connection rate.
The broker type determines the limit on the rate of new IAM connections per broker. Please note the limit is not about
the total number of connections per broker but the rate of new IAM connections per
broker. See the limits page for MSK for the limit on the rate of new IAM connections per broker for
different broker types.
While using the library from a Kafka Connect client, you may see an error of the form:
[Producer clientId=connector-...] Failed authentication with BROKER (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:
This most commonly occurs when two different class loaders are used to load different classes used by the library.
It can happen when the aws-msk-iam-auth
library is placed on the plugin path for Kafka Connect. Since the
library is actually used by the Kafka producer and consumer clients and not the Kafka Connect plugin itself,
it should be placed in a location that is on the classpath but outside the plugin path. This should ensure that Kafka
Connect’s PluginClassLoader
is not used to load classes for the aws-msk-iam-auth
library.
If you are building the library from source using gradle build
and copying it over to a Kafka client on that or
another machine, there is a chance that some dependencies may not be available on the Kafka client machine. In that
case, you could instead generate and use the uber jar that packages all the necessary runtime dependencies by
running gradle shadowJar
.
This library introduces a new SASL mechanism called AWS_MSK_IAM
. The IAMLoginModule
is used to register the
IAMSaslClientProvider
as a Provider
for the AWS_MSK_IAM
mechanism. The IAMSaslClientProvider
is used to
generate a new IAMSaslClient
every time a new connection to a Kafka broker is opened or an existing connection
is re-authenticated.
The IAMSaslClient
is used to perform the actual SASL authentication for a client. It evaluates challenges and creates
responses that can be used to authenticate a Kafka client to a Kafka broker configured with AWS IAM authentication
. Its initial response contains an authentication payload that includes a signature generated using the client’s
credentials. The IAMClientCallbackHandler
is used to extract the client credentials that are then used for
generating the signature.
The authentication payload and the signature are generated by the AWS4SignedPayloadGenerator
class based on the
parameters specified in AuthenticationRequestParams
. The authentication payload consists of a JSON object:
{
"version" : "2020_10_22",
"host" : "<broker address>",
"user-agent": "<user agent string from the client>",
"action": "kafka-cluster:Connect",
"x-amz-algorithm" : "<algorithm>",
"x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request",
"x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>",
"x-amz-security-token" : "<clientAWSSessionToken if any>",
"x-amz-signedheaders" : "host",
"x-amz-expires" : "<expiration in seconds>",
"x-amz-signature" : "<AWS SigV4 signature computed by the client>"
}
Please note that all the keys in the authentication payload are in lowercase.
The values of the following keys in the authentication payload are fixed for a client:
"version"
currently always has the value "2020_10_22"
"user-agent"
is a string passed in by the client library to describe the client. The simplest user agent is "<name of client library>"
. However, more details can be added to the user agent as well "<name of client library /<version of client library>/<os and version>/<version of language>"
The values of the remaining keys will be generated as part of calculating the signature of the authentication payload
. The signature is calculated by following the rules for generating presigned urls. Although, this library
uses the AWS4Signer
from the AWS SDK for Java to generate the signature we outline the steps followed
in calculating the signature and generating the authentication payload from it.
The inputs to this calculation are
AWSAccessKeyId
,AWSSecretyKeyId
and the optional client SessionToken
.The steps in the calculation are:
We start by generating a canonical request with an empty payload based on the inputs.
The canonical request in this case has the following form
"GET\n"+
"/\n"+
<CanonicalQueryString>+"\n"+
<CanonicalHeaders>+"\n"+
<SignedHeaders>+"\n"+
<HashedPayload>
<CanonicalQueryString>
specifies the authentication parameters as URI-encoded query string parameters. We URI-encode
query parameter names and values individually. We also sort the parameters in the canonical query string
alphabetically by key name. The sorting occurs after encoding.
The <CanonicalQueryString>
can be calculated by:
UriEncode("Action")+"="+UriEncode("kafka-cluster:Connect")+"&"+
UriEncode("X-Amz-Algorithm")+"="+UriEncode("AWS4-HMAC-SHA256") + "&" +
UriEncode("X-Amz-Credential")+"="+UriEncode("<clientAWSAccessKeyID>/<timestamp in yyyyMMdd format>/<AWS region>/kafka-cluster/aws4_request") + "&" +
UriEncode("X-Amz-Date")+"="+UriEncode("<timestamp in yyyyMMdd'T'HHmmss'Z' format>") + "&" +
UriEncode("X-Amz-Expires")+"="+UriEncode("900") + "&" +
UriEncode("X-Amz-Security-Token")+"="+UriEncode("<client Session Token>") + "&" +
UriEncode("X-Amz-SignedHeaders")+"="+UriEncode("host")
The exact definition of URIEncode from generating presigned urls maybe found later.
The query string parameters are in order:
"Action"
: Always has the value "kafka-cluster:Connect"
"X-Amz-Algorithm"
: Describes the algorithm used to calculate the signature. Currently it is "AWS4-HMAC-SHA256"
"X-Amz-Credential"
: Contains the access key ID, timestamp in yyyyMMdd
format, the scope of the credential andaws4_request
. The scope is defined as the AWS region of the Kafka broker and the name of theus-west-2
region, the scope is us-west-2/kafka-cluster
."X-Amz-Date"
: The date and time format must follow the ISO 8601 standard, and must be formatted with the"X-Amz-Expires"
: Provides the time period, in seconds, for which the generated presigned URL is valid. We"X-Amz-Security-Token"
: The session token if it is specified as part of the AWS Credential. Otherwise this query"X-Amz-Signedheaders"
is currently always set to host
.<CanonicalHeaders>
is a list of request headers with their values. Header names must be in lowercase.
Individual header name and value pairs are separated by the newline character ("\n"
). In this case there is just one
header. So <CanonicalHeaders>
is calculated by:
"host"+":"+"<broker host name>"+"\n"
<SignedHeaders>
is an alphabetically sorted, semicolon-separated list of lowercase request header names. In this
case there is just one header. So <SignedHeaders>
is calculated by:
"host"
Since the payload is empty the <HashedPayload>
is calculated as
Hex(SHA256Hash(""))
where
Hex
is a function to do lowercase base 16 encoding.SHA256Hash
is a Secure Hash Algorithm (SHA) cryptographic hash function.From the canonical string, we derive the string that will be used to sign the authentication payload.
The String to Sign is calculated as:
"AWS4-HMAC-SHA256" + "\n" +
"<timestamp in yyyyMMdd format>" + "\n" +
<Scope> + "\n" +
Hex(SHA256Hash(<CanonicalRequest>))
where
Hex
is a function to do lowercase base 16 encoding.SHA256Hash
is a Secure Hash Algorithm (SHA) cryptographic hash function.The <Scope>
is defined as the AWS region of the Kafka broker and the name of the service (“kafka-cluster” in this
case). For example if the broker is in us-west-2
region, the scope is "us-west-2/kafka-cluster"
. It must be the same scope
as was defined for the "X-Amz-Credential"
query parameter while generating the canonical query string.
The signature is calculated by:
DateKey = HMAC-SHA256("AWS4"+"<client AWSSecretAccessKey>", "<timestanp in YYYYMMDD>")
DateRegionKey = HMAC-SHA256(<DateKey>, "<aws-region>")
DateRegionServiceKey = HMAC-SHA256(<DateRegionKey>, "kafka-cluster")
SigningKey = HMAC-SHA256(<DateRegionServiceKey>, "aws4_request")
Signature = Hex(HMAC-SHA256(<SigningKey>, <StringToSign>))
where
Hex
is a function to do lowercase base 16 encoding.HMAC-SHA256
is a function that computes HMAC by using the SHA256 algorithm with the signing key provided.The <Signature>
is the final signature.
As mentioned earlier, the authentication payload is a json object with certain keys. All the keys are in lower case.
The following keys in the authentication payload json are constant:
"version"
"user-agent"
All the query parameters calculated earlier are added to the authentication payload json. The keys are lower case
strings and the values are the ones calculated earlier but the values are not uri encoded:
"action"
"x-amz-algorithm"
"x-amz-credential"
"x-amz-date"
"x-amz-expires"
"x-amz-security-token"
"x-amz-signedheaders"
The host header is added to the authentication payload json
"host"
and its value is set to the hostname of the broker being connected.The <Signature>
calculated in the previous step is added to the authentication payload json as
"x-amz-signature"
and its value is set to<Signature>
This finally yields the authentication payload that looks like
{
"version" : "2020_10_22",
"host" : "<broker address>",
"user-agent": "<user agent string from the client>",
"action": "kafka-cluster:Connect",
"x-amz-algorithm" : "<algorithm>",
"x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request",
"x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>",
"x-amz-security-token" : "<clientAWSSessionToken if any>",
"x-amz-signedheaders" : "host",
"x-amz-expires" : "<expiration in seconds>",
"x-amz-signature" : "<AWS SigV4 signature computed by the client>"
}
This authentication payload is sent as the first message from the client to the Kafka broker. The kafka broker then
responds with a challenge. We expect a non-empty response from the broker if authentication using AWS IAM succeeded.
The authentication response is a json object that may be logged:
{
"version" : "2020_10_22",
"request-id" : "<broker generated request id>"
}
The request-id
which is generated by the broker can be useful for debugging issues with AWS IAM authentication
between the client and the broker.
Snipped from the detailed rules for generating presigned urls.
URI encode every byte. UriEncode() must enforce the following rules:
The following is an example UriEncode() function in Java.
public class UriEncode {
public static String UriEncode(CharSequence input, boolean encodeSlash) {
StringBuilder result = new StringBuilder();
for (int i = 0; i < input.length(); i++) {
char ch = input.charAt(i);
if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || ch == '_' || ch == '-' || ch == '~' || ch == '.') {
result.append(ch);
} else if (ch == '/') {
result.append(encodeSlash ? "%2F" : ch);
} else {
result.append(toHexUTF8(ch));
}
}
return result.toString();
}
}
sasl.jaas.config
awsDebugCreds=true
, do not fail the connection.IAMClientCallbackHandler
and AWSCredentialsCallback
classes.slf4j-api
.See CONTRIBUTING for more information.