diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index bc28ce5eeae72..9d2c1c1cd7e5d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -42,7 +42,7 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.{Credentials, UserGroupInformation, SecurityUtil} import org.apache.hadoop.security.token.{TokenIdentifier, Token} import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException +import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.util.Records import org.apache.spark.deploy.SparkHadoopUtil @@ -268,6 +269,7 @@ private[spark] class Client( val distributedUris = new HashSet[String] obtainTokenForHiveMetastore(hadoopConf, credentials) obtainTokenForHBase(hadoopConf, credentials) + obtainTokenForResourceManager(hadoopConf, credentials, yarnClient) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -1313,6 +1315,32 @@ object Client extends Logging { } } + /** + * Obtain security delegation token for the ResourceManager + */ + def obtainTokenForResourceManager(conf: Configuration, credentials: Credentials, yarnClient: YarnClient): Unit = { + if (UserGroupInformation.isSecurityEnabled) { + logDebug("Requesting RM delegation token") + + val rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT) + + val renewer = SecurityUtil.getServerPrincipal( + conf.get(YarnConfiguration.RM_PRINCIPAL), + rmAddress.getHostName) + + val protoToken = yarnClient.getRMDelegationToken(new Text(renewer)) + + // rmAddress should be removed later. See also TEZ-269 + val token = ConverterUtils.convertFromYarn(protoToken, rmAddress) + + credentials.addToken(new Text(token.getService), token) + + logInfo(s"RM delegation token added: service ${token.getService} with renewer ${renewer}") + } + } + /** * Return whether the two file systems are the same. */