From 04914705ccc500d16b4e14d90a20a6f67aa71090 Mon Sep 17 00:00:00 2001 From: Bolke de Bruin Date: Sat, 18 Jul 2015 10:37:14 +0200 Subject: [PATCH] Add RM delegation token to launch environment In case of a kerberized cluster it is required to have a delegation token for the resource manager in the launch context, otherwise the job will fail when using "--master yarn-cluster". This closes issue SPARK-9019 --- .../org/apache/spark/deploy/yarn/Client.scala | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index bc28ce5eeae72..9d2c1c1cd7e5d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -42,7 +42,7 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.{Credentials, UserGroupInformation, SecurityUtil} import org.apache.hadoop.security.token.{TokenIdentifier, Token} import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException +import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.util.Records import org.apache.spark.deploy.SparkHadoopUtil @@ -268,6 +269,7 @@ private[spark] class Client( val distributedUris = new HashSet[String] obtainTokenForHiveMetastore(hadoopConf, credentials) obtainTokenForHBase(hadoopConf, credentials) + obtainTokenForResourceManager(hadoopConf, credentials, yarnClient) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -1313,6 +1315,32 @@ object Client extends Logging { } } + /** + * Obtain security delegation token for the ResourceManager + */ + def obtainTokenForResourceManager(conf: Configuration, credentials: Credentials, yarnClient: YarnClient): Unit = { + if (UserGroupInformation.isSecurityEnabled) { + logDebug("Requesting RM delegation token") + + val rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT) + + val renewer = SecurityUtil.getServerPrincipal( + conf.get(YarnConfiguration.RM_PRINCIPAL), + rmAddress.getHostName) + + val protoToken = yarnClient.getRMDelegationToken(new Text(renewer)) + + // rmAddress should be removed later. See also TEZ-269 + val token = ConverterUtils.convertFromYarn(protoToken, rmAddress) + + credentials.addToken(new Text(token.getService), token) + + logInfo(s"RM delegation token added: service ${token.getService} with renewer ${renewer}") + } + } + /** * Return whether the two file systems are the same. */