From f6134919c0bd9d4a673b7fc5d471ff0a8a5f3e6e Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 11 Mar 2024 15:02:19 +0800 Subject: [PATCH 1/2] Remove `flink.` prefix for create session configurations --- .../engine/flink/session/FlinkSQLSessionManager.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index 3bcb488e8dd..acd706970be 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -54,19 +54,22 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) password: String, ipAddress: String, conf: Map[String, String]): Session = { - conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + val newConf = conf.map { + case (k, v) => k.stripPrefix("flink.") -> v + } + newConf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( getSessionOption).getOrElse { val flinkInternalSession = sessionManager.openSession( SessionEnvironment.newBuilder .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1) - .addSessionConfig(mapAsJavaMap(conf)) + .addSessionConfig(mapAsJavaMap(newConf)) .build) val session = new FlinkSessionImpl( protocol, user, password, ipAddress, - conf, + newConf, this, flinkInternalSession) session From fc750dc5a5f2331fb20f324a6f236782a12c77aa Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 12 Mar 2024 16:36:30 +0800 Subject: [PATCH 2/2] comment --- .../session/FlinkSQLSessionManager.scala | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index acd706970be..2c6328e30fd 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -54,26 +54,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) password: String, ipAddress: String, conf: Map[String, String]): Session = { - val newConf = conf.map { - case (k, v) => k.stripPrefix("flink.") -> v - } - newConf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( - getSessionOption).getOrElse { - val flinkInternalSession = sessionManager.openSession( - SessionEnvironment.newBuilder - .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1) - .addSessionConfig(mapAsJavaMap(newConf)) - .build) - val session = new FlinkSessionImpl( - protocol, - user, - password, - ipAddress, - newConf, - this, - flinkInternalSession) - session - } + val normalizedConf = conf.map { case (k, v) => k.stripPrefix("flink.") -> v } + normalizedConf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID) + .flatMap(getSessionOption).getOrElse { + val flinkInternalSession = sessionManager.openSession( + SessionEnvironment.newBuilder + .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1) + .addSessionConfig(mapAsJavaMap(normalizedConf)) + .build) + val session = new FlinkSessionImpl( + protocol, + user, + password, + ipAddress, + normalizedConf, + this, + flinkInternalSession) + session + } } override def getSessionOption(sessionHandle: SessionHandle): Option[Session] = {