From 1a7848546783a34809aa0d21af6b44fc4d2d7f52 Mon Sep 17 00:00:00 2001 From: Andy Pfister Date: Sat, 15 Mar 2025 23:36:34 +0100 Subject: [PATCH 1/8] Support `supports_insert_on_duplicate_skip` --- .../sqlserver/database_statements.rb | 68 +++++++++++++++++++ .../connection_adapters/sqlserver_adapter.rb | 2 +- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/lib/active_record/connection_adapters/sqlserver/database_statements.rb b/lib/active_record/connection_adapters/sqlserver/database_statements.rb index d5dc267ea..997f45ce9 100644 --- a/lib/active_record/connection_adapters/sqlserver/database_statements.rb +++ b/lib/active_record/connection_adapters/sqlserver/database_statements.rb @@ -154,6 +154,73 @@ def default_insert_value(column) private :default_insert_value def build_insert_sql(insert) # :nodoc: + if insert.skip_duplicates? + insert_all = insert.send(:insert_all) + conflict_columns = get_conflicted_columns(insert_all:, insert:) + + # if we do not have any columns that might have conflicting values, just execute a regular insert + return build_sql_for_regular_insert(insert) if conflict_columns.empty? + + make_inserts_unique(insert_all:, conflict_columns:) + + primary_keys_for_insert = insert_all.primary_keys.to_set + + # if we receive a composite primary key, MSSQL will not be happy when we want to call "IDENTITY_INSERT" + # as there is likely no IDENTITY column + # so we need to check if there is exacty one + # TODO: Refactor to use existing "SET IDENTITY_INSERT" settings + enable_identity_insert = primary_keys_for_insert.length == 1 && + (insert_all.primary_keys.to_set & insert.keys).present? + + sql = +"" + sql << "SET IDENTITY_INSERT #{insert.model.quoted_table_name} ON;" if enable_identity_insert + sql << "MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target" + sql << " USING (SELECT DISTINCT * FROM (#{insert.values_list}) AS t1 (#{insert.send(:columns_list)})) AS source" + sql << " ON (#{conflict_columns.map do |columns| + columns.map do |column| + "target.#{quote_column_name(column)} = source.#{quote_column_name(column)}" + end.join(" AND ") + end.join(") OR (")})" + sql << " WHEN NOT MATCHED BY TARGET THEN" + sql << " INSERT (#{insert.send(:columns_list)}) #{insert.values_list}" + if (returning = insert_all.returning) + sql << " OUTPUT " << returning.map { |column| "INSERTED.#{quote_column_name(column)}" }.join(", ") + end + sql << ";" + sql << "SET IDENTITY_INSERT #{insert.model.quoted_table_name} OFF;" if enable_identity_insert + return sql + end + + build_sql_for_regular_insert(insert) + end + + # MERGE executes a JOIN between our data we would like to insert and the existing data in the table + # but since it is a JOIN, it requires the data in the source also to be unique (aka our values to insert) + # here we modify @inserts in place of the "insert_all" object to be unique + # keeping the last occurence + # note that for other DBMS, this job is usually handed off to them by specifying something like + # "ON DUPLICATE SKIP" or "ON DUPLICATE UPDATE" + def make_inserts_unique(insert_all:, conflict_columns:) + unique_inserts = insert_all.inserts.reverse.uniq { |insert| conflict_columns.map { |column| insert[column] } }.reverse + insert_all.instance_variable_set(:@inserts, unique_inserts) + end + private :make_inserts_unique + + def get_conflicted_columns(insert_all:, insert:) + if (unique_by = insert_all.unique_by) + [unique_by.columns] + else + # Compare against every unique constraint (primary key included). + # Discard constraints that are not fully included on insert.keys. Prevents invalid queries. + # Example: ignore unique index for columns ["name"] if insert keys is ["description"] + (insert_all.send(:unique_indexes).map(&:columns) + [insert_all.primary_keys]).select do |columns| + columns.to_set.subset?(insert.keys) + end + end + end + private :get_conflicted_columns + + def build_sql_for_regular_insert(insert) sql = "INSERT #{insert.into}" returning = insert.send(:insert_all).returning @@ -170,6 +237,7 @@ def build_insert_sql(insert) # :nodoc: sql << " #{insert.values_list}" sql end + private :build_sql_for_regular_insert # === SQLServer Specific ======================================== # diff --git a/lib/active_record/connection_adapters/sqlserver_adapter.rb b/lib/active_record/connection_adapters/sqlserver_adapter.rb index 170f3286b..a1dab6f37 100644 --- a/lib/active_record/connection_adapters/sqlserver_adapter.rb +++ b/lib/active_record/connection_adapters/sqlserver_adapter.rb @@ -216,7 +216,7 @@ def supports_insert_returning? end def supports_insert_on_duplicate_skip? - false + true end def supports_insert_on_duplicate_update? From c0be62cbb71cfd8c1130703f4574807e5e4ea404 Mon Sep 17 00:00:00 2001 From: Andy Pfister Date: Sun, 16 Mar 2025 14:34:31 +0100 Subject: [PATCH 2/8] Support `supports_insert_on_duplicate_update?` --- .../sqlserver/database_statements.rb | 117 ++++++++++++------ .../connection_adapters/sqlserver_adapter.rb | 2 +- test/cases/coerced_tests.rb | 12 ++ 3 files changed, 92 insertions(+), 39 deletions(-) diff --git a/lib/active_record/connection_adapters/sqlserver/database_statements.rb b/lib/active_record/connection_adapters/sqlserver/database_statements.rb index 997f45ce9..659b269f0 100644 --- a/lib/active_record/connection_adapters/sqlserver/database_statements.rb +++ b/lib/active_record/connection_adapters/sqlserver/database_statements.rb @@ -154,14 +154,12 @@ def default_insert_value(column) private :default_insert_value def build_insert_sql(insert) # :nodoc: - if insert.skip_duplicates? + if insert.skip_duplicates? || insert.update_duplicates? insert_all = insert.send(:insert_all) conflict_columns = get_conflicted_columns(insert_all:, insert:) # if we do not have any columns that might have conflicting values, just execute a regular insert - return build_sql_for_regular_insert(insert) if conflict_columns.empty? - - make_inserts_unique(insert_all:, conflict_columns:) + return build_sql_for_regular_insert(insert) if conflict_columns.flatten.empty? primary_keys_for_insert = insert_all.primary_keys.to_set @@ -172,39 +170,92 @@ def build_insert_sql(insert) # :nodoc: enable_identity_insert = primary_keys_for_insert.length == 1 && (insert_all.primary_keys.to_set & insert.keys).present? - sql = +"" - sql << "SET IDENTITY_INSERT #{insert.model.quoted_table_name} ON;" if enable_identity_insert - sql << "MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target" - sql << " USING (SELECT DISTINCT * FROM (#{insert.values_list}) AS t1 (#{insert.send(:columns_list)})) AS source" - sql << " ON (#{conflict_columns.map do |columns| - columns.map do |column| - "target.#{quote_column_name(column)} = source.#{quote_column_name(column)}" - end.join(" AND ") - end.join(") OR (")})" - sql << " WHEN NOT MATCHED BY TARGET THEN" - sql << " INSERT (#{insert.send(:columns_list)}) #{insert.values_list}" - if (returning = insert_all.returning) - sql << " OUTPUT " << returning.map { |column| "INSERTED.#{quote_column_name(column)}" }.join(", ") + # why is the "PARTITION BY" clause needed? + # in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens + # when duplicates are found (SKIP OR UPDATE) + # by default rows are considered to be unique by every unique index on the table + # but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves + # otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables + # this works easiest by using PARTITION and make sure that any record + # we are trying to insert is "the first one seen across all the potential conflicted columns" + sql = <<~SQL + #{"SET IDENTITY_INSERT #{insert.model.quoted_table_name} ON;" if enable_identity_insert} + MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target + USING ( + SELECT * + FROM ( + SELECT #{insert.send(:columns_list)}, #{conflict_columns.map.with_index do |group_of_conflicted_columns, index| + <<~PARTITION_BY + ROW_NUMBER() OVER ( + PARTITION BY #{group_of_conflicted_columns.map { |column| quote_column_name(column) }.join(",")} + ORDER BY #{group_of_conflicted_columns.map { |column| "#{quote_column_name(column)} DESC" }.join(",")} + ) AS rn_#{index} + PARTITION_BY + end.join(", ") + } + FROM (#{insert.values_list}) + AS t1 (#{insert.send(:columns_list)}) + ) AS ranked_source + WHERE #{conflict_columns.map.with_index do |group_of_conflicted_columns, index| + "rn_#{index} = 1" + end.join(" AND ") + } + ) AS source + ON (#{conflict_columns.map do |columns| + columns.map do |column| + "target.#{quote_column_name(column)} = source.#{quote_column_name(column)}" + end.join(" AND ") + end.join(") OR (")}) + SQL + + if insert.update_duplicates? + sql << " WHEN MATCHED THEN UPDATE SET " + + if insert.raw_update_sql? + sql << insert.raw_update_sql + else + if insert.record_timestamps? + sql << insert.model.timestamp_attributes_for_update_in_model.filter_map do |column_name| + if insert.send(:touch_timestamp_attribute?, column_name) + "target.#{quote_column_name(column_name)}=CASE WHEN (#{insert.updatable_columns.map { |column| "(COALESCE(target.#{quote_column_name(column)}, 'NULL') = COALESCE(source.#{quote_column_name(column)}, 'NULL'))" }.join(" AND ")}) THEN target.#{quote_column_name(column_name)} ELSE #{high_precision_current_timestamp} END," + end + end.join + end + + sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",") + end end + sql << " WHEN NOT MATCHED BY TARGET THEN" + sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})" + sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all)) + sql << ";" sql << "SET IDENTITY_INSERT #{insert.model.quoted_table_name} OFF;" if enable_identity_insert + return sql end build_sql_for_regular_insert(insert) end - # MERGE executes a JOIN between our data we would like to insert and the existing data in the table - # but since it is a JOIN, it requires the data in the source also to be unique (aka our values to insert) - # here we modify @inserts in place of the "insert_all" object to be unique - # keeping the last occurence - # note that for other DBMS, this job is usually handed off to them by specifying something like - # "ON DUPLICATE SKIP" or "ON DUPLICATE UPDATE" - def make_inserts_unique(insert_all:, conflict_columns:) - unique_inserts = insert_all.inserts.reverse.uniq { |insert| conflict_columns.map { |column| insert[column] } }.reverse - insert_all.instance_variable_set(:@inserts, unique_inserts) + def build_sql_for_returning(insert:, insert_all:) + return "" unless insert_all.returning + + returning_values_sql = if insert_all.returning.is_a?(String) + insert_all.returning + else + Array(insert_all.returning).map do |attribute| + if insert.model.attribute_alias?(attribute) + "INSERTED.#{quote_column_name(insert.model.attribute_alias(attribute))} AS #{quote_column_name(attribute)}" + else + "INSERTED.#{quote_column_name(attribute)}" + end + end.join(",") + end + + " OUTPUT #{returning_values_sql}" end - private :make_inserts_unique + private :build_sql_for_returning def get_conflicted_columns(insert_all:, insert:) if (unique_by = insert_all.unique_by) @@ -223,17 +274,7 @@ def get_conflicted_columns(insert_all:, insert:) def build_sql_for_regular_insert(insert) sql = "INSERT #{insert.into}" - returning = insert.send(:insert_all).returning - - if returning - returning_sql = if returning.is_a?(String) - returning - else - Array(returning).map { |column| "INSERTED.#{quote_column_name(column)}" }.join(", ") - end - sql << " OUTPUT #{returning_sql}" - end - + sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all)) sql << " #{insert.values_list}" sql end diff --git a/lib/active_record/connection_adapters/sqlserver_adapter.rb b/lib/active_record/connection_adapters/sqlserver_adapter.rb index a1dab6f37..3eea37906 100644 --- a/lib/active_record/connection_adapters/sqlserver_adapter.rb +++ b/lib/active_record/connection_adapters/sqlserver_adapter.rb @@ -220,7 +220,7 @@ def supports_insert_on_duplicate_skip? end def supports_insert_on_duplicate_update? - false + true end def supports_insert_conflict_target? diff --git a/test/cases/coerced_tests.rb b/test/cases/coerced_tests.rb index 3d56cf48f..01e26ab6f 100644 --- a/test/cases/coerced_tests.rb +++ b/test/cases/coerced_tests.rb @@ -2552,6 +2552,18 @@ def test_insert_with_type_casting_and_serialize_is_consistent_coerced Book.where(author_id: nil, name: '["Array"]').delete_all Book.lease_connection.add_index(:books, [:author_id, :name], unique: true) end + + # Same as original but using target.status for assignment and GREATEST for operator + coerce_tests! :test_upsert_all_updates_using_provided_sql + def test_upsert_all_updates_using_provided_sql_coerced + Book.upsert_all( + [{id: 1, status: 1}, {id: 2, status: 1}], + on_duplicate: Arel.sql("target.status = GREATEST(target.status, 1)") + ) + + assert_equal "published", Book.find(1).status + assert_equal "written", Book.find(2).status + end end module ActiveRecord From 3f88a743ea936d36cac999cc2ecbc6715fd5dccd Mon Sep 17 00:00:00 2001 From: Andy Pfister Date: Sun, 16 Mar 2025 16:33:33 +0100 Subject: [PATCH 3/8] extend "query_requires_identity_insert?" for merge queries with inserts --- .../sqlserver/database_statements.rb | 23 ++++++++----------- .../sqlserver/schema_statements.rb | 2 ++ test/cases/adapter_test_sqlserver.rb | 12 +++++++++- test/cases/schema_test_sqlserver.rb | 18 +++++++++++++++ 4 files changed, 41 insertions(+), 14 deletions(-) diff --git a/lib/active_record/connection_adapters/sqlserver/database_statements.rb b/lib/active_record/connection_adapters/sqlserver/database_statements.rb index 659b269f0..c9e1253ff 100644 --- a/lib/active_record/connection_adapters/sqlserver/database_statements.rb +++ b/lib/active_record/connection_adapters/sqlserver/database_statements.rb @@ -161,15 +161,6 @@ def build_insert_sql(insert) # :nodoc: # if we do not have any columns that might have conflicting values, just execute a regular insert return build_sql_for_regular_insert(insert) if conflict_columns.flatten.empty? - primary_keys_for_insert = insert_all.primary_keys.to_set - - # if we receive a composite primary key, MSSQL will not be happy when we want to call "IDENTITY_INSERT" - # as there is likely no IDENTITY column - # so we need to check if there is exacty one - # TODO: Refactor to use existing "SET IDENTITY_INSERT" settings - enable_identity_insert = primary_keys_for_insert.length == 1 && - (insert_all.primary_keys.to_set & insert.keys).present? - # why is the "PARTITION BY" clause needed? # in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens # when duplicates are found (SKIP OR UPDATE) @@ -179,7 +170,7 @@ def build_insert_sql(insert) # :nodoc: # this works easiest by using PARTITION and make sure that any record # we are trying to insert is "the first one seen across all the potential conflicted columns" sql = <<~SQL - #{"SET IDENTITY_INSERT #{insert.model.quoted_table_name} ON;" if enable_identity_insert} + MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * @@ -230,7 +221,6 @@ def build_insert_sql(insert) # :nodoc: sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all)) sql << ";" - sql << "SET IDENTITY_INSERT #{insert.model.quoted_table_name} OFF;" if enable_identity_insert return sql end @@ -527,11 +517,18 @@ def query_requires_identity_insert?(sql) raw_table_name = get_raw_table_name(sql) id_column = identity_columns(raw_table_name).first - (id_column && sql =~ /^\s*(INSERT|EXEC sp_executesql N'INSERT)[^(]+\([^)]*\b(#{id_column.name})\b,?[^)]*\)/i) ? SQLServer::Utils.extract_identifiers(raw_table_name).quoted : false + if id_column && ( + sql =~ /^\s*(INSERT|EXEC sp_executesql N'INSERT)[^(]+\([^)]*\b(#{id_column.name})\b,?[^)]*\)/i || + sql =~ /^\s*MERGE INTO.+THEN INSERT \([^)]*\b(#{id_column.name})\b,?[^)]*\)/im + ) + SQLServer::Utils.extract_identifiers(raw_table_name).quoted + else + false + end end def insert_sql?(sql) - !(sql =~ /\A\s*(INSERT|EXEC sp_executesql N'INSERT)/i).nil? + !(sql =~ /\A\s*(INSERT|EXEC sp_executesql N'INSERT|MERGE INTO.+THEN INSERT)/im).nil? end def identity_columns(table_name) diff --git a/lib/active_record/connection_adapters/sqlserver/schema_statements.rb b/lib/active_record/connection_adapters/sqlserver/schema_statements.rb index 140cc89e1..e6b1b8798 100644 --- a/lib/active_record/connection_adapters/sqlserver/schema_statements.rb +++ b/lib/active_record/connection_adapters/sqlserver/schema_statements.rb @@ -759,6 +759,8 @@ def get_raw_table_name(sql) .match(/\s*([^(]*)/i)[0] elsif s.match?(/^\s*UPDATE\s+.*/i) s.match(/UPDATE\s+([^\(\s]+)\s*/i)[1] + elsif s.match?(/^\s*MERGE INTO.*/i) + s.match(/^\s*MERGE\s+INTO\s+(\[?[a-z_ -]+\]?\.?\[?[a-z_ -]+\]?)\s+(AS|WITH|USING)/i)[1] else s.match(/FROM[\s|\(]+((\[[^\(\]]+\])|[^\(\s]+)\s*/i)[1] end.strip diff --git a/test/cases/adapter_test_sqlserver.rb b/test/cases/adapter_test_sqlserver.rb index a5403775a..3ae39d394 100644 --- a/test/cases/adapter_test_sqlserver.rb +++ b/test/cases/adapter_test_sqlserver.rb @@ -13,6 +13,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase fixtures :tasks let(:basic_insert_sql) { "INSERT INTO [funny_jokes] ([name]) VALUES('Knock knock')" } + let(:basic_merge_sql) { "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [id], [name], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([id], [name]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name]" } let(:basic_update_sql) { "UPDATE [customers] SET [address_street] = NULL WHERE [id] = 2" } let(:basic_select_sql) { "SELECT * FROM [customers] WHERE ([customers].[id] = 1)" } @@ -91,6 +92,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase it "return unquoted table name object from basic INSERT UPDATE and SELECT statements" do assert_equal "funny_jokes", connection.send(:get_table_name, basic_insert_sql) + assert_equal "ships", connection.send(:get_table_name, basic_merge_sql) assert_equal "customers", connection.send(:get_table_name, basic_update_sql) assert_equal "customers", connection.send(:get_table_name, basic_select_sql) end @@ -219,6 +221,10 @@ class AdapterTestSQLServer < ActiveRecord::TestCase @identity_insert_sql_unquoted_sp = "EXEC sp_executesql N'INSERT INTO funny_jokes (id, name) VALUES (@0, @1)', N'@0 int, @1 nvarchar(255)', @0 = 420, @1 = N'Knock knock'" @identity_insert_sql_unordered_sp = "EXEC sp_executesql N'INSERT INTO [funny_jokes] ([name],[id]) VALUES (@0, @1)', N'@0 nvarchar(255), @1 int', @0 = N'Knock knock', @1 = 420" + @identity_merge_sql = "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [id], [name], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([id], [name]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name] WHEN NOT MATCHED BY TARGET THEN INSERT ([id], [name]) VALUES (source.[id], source.[name]) OUTPUT INSERTED.[id]" + @identity_merge_sql_unquoted = "MERGE INTO ships WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT id, name, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY id DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 (id, name) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.id = source.id) WHEN MATCHED THEN UPDATE SET target.name = source.name WHEN NOT MATCHED BY TARGET THEN INSERT (id, name) VALUES (source.id, source.name) OUTPUT INSERTED.id" + @identity_merge_sql_unordered = "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [name], [id], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([name], [id]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name] WHEN NOT MATCHED BY TARGET THEN INSERT ([name], [id]) VALUES (source.[name], source.[id]) OUTPUT INSERTED.[id]" + @identity_insert_sql_non_dbo = "INSERT INTO [test].[aliens] ([id],[name]) VALUES(420,'Mork')" @identity_insert_sql_non_dbo_unquoted = "INSERT INTO test.aliens ([id],[name]) VALUES(420,'Mork')" @identity_insert_sql_non_dbo_unordered = "INSERT INTO [test].[aliens] ([name],[id]) VALUES('Mork',420)" @@ -235,6 +241,10 @@ class AdapterTestSQLServer < ActiveRecord::TestCase assert_equal "[funny_jokes]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_unquoted_sp) assert_equal "[funny_jokes]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_unordered_sp) + assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql) + assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql_unquoted) + assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql_unordered) + assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo) assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo_unquoted) assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo_unordered) @@ -244,7 +254,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase end it "return false to #query_requires_identity_insert? for normal SQL" do - [basic_insert_sql, basic_update_sql, basic_select_sql].each do |sql| + [basic_insert_sql, basic_merge_sql, basic_update_sql, basic_select_sql].each do |sql| assert !connection.send(:query_requires_identity_insert?, sql), "SQL was #{sql}" end end diff --git a/test/cases/schema_test_sqlserver.rb b/test/cases/schema_test_sqlserver.rb index 7057a9ca6..fb9eb1cc0 100644 --- a/test/cases/schema_test_sqlserver.rb +++ b/test/cases/schema_test_sqlserver.rb @@ -102,6 +102,24 @@ class SchemaTestSQLServer < ActiveRecord::TestCase end end + describe "MERGE statements" do + it do + assert_equal "[dashboards]", connection.send(:get_raw_table_name, "MERGE INTO [dashboards] AS target") + end + + it do + assert_equal "lock_without_defaults", connection.send(:get_raw_table_name, "MERGE INTO lock_without_defaults AS target") + end + + it do + assert_equal "[WITH - SPACES]", connection.send(:get_raw_table_name, "MERGE INTO [WITH - SPACES] AS target") + end + + it do + assert_equal "[with].[select notation]", connection.send(:get_raw_table_name, "MERGE INTO [with].[select notation] AS target") + end + end + describe "CREATE VIEW statements" do it do assert_equal "test_table_as", connection.send(:get_raw_table_name, "CREATE VIEW test_views ( test_table_a_id, test_table_b_id ) AS SELECT test_table_as.id as test_table_a_id, test_table_bs.id as test_table_b_id FROM (test_table_as with(nolock) LEFT JOIN test_table_bs with(nolock) ON (test_table_as.id = test_table_bs.test_table_a_id))") From dd20bceca7ed25b91d3119172e3e0a236558add2 Mon Sep 17 00:00:00 2001 From: Andy Pfister Date: Sun, 16 Mar 2025 16:46:29 +0100 Subject: [PATCH 4/8] refactor code into more smaller methods --- .../sqlserver/database_statements.rb | 176 ++++++++++-------- 1 file changed, 99 insertions(+), 77 deletions(-) diff --git a/lib/active_record/connection_adapters/sqlserver/database_statements.rb b/lib/active_record/connection_adapters/sqlserver/database_statements.rb index c9e1253ff..2e466c664 100644 --- a/lib/active_record/connection_adapters/sqlserver/database_statements.rb +++ b/lib/active_record/connection_adapters/sqlserver/database_statements.rb @@ -156,47 +156,23 @@ def default_insert_value(column) def build_insert_sql(insert) # :nodoc: if insert.skip_duplicates? || insert.update_duplicates? insert_all = insert.send(:insert_all) - conflict_columns = get_conflicted_columns(insert_all:, insert:) + columns_with_uniqueness_constraints = get_columns_with_uniqueness_constraints(insert_all:, insert:) # if we do not have any columns that might have conflicting values, just execute a regular insert - return build_sql_for_regular_insert(insert) if conflict_columns.flatten.empty? - - # why is the "PARTITION BY" clause needed? - # in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens - # when duplicates are found (SKIP OR UPDATE) - # by default rows are considered to be unique by every unique index on the table - # but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves - # otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables - # this works easiest by using PARTITION and make sure that any record - # we are trying to insert is "the first one seen across all the potential conflicted columns" - sql = <<~SQL + return build_sql_for_regular_insert(insert) if columns_with_uniqueness_constraints.flatten.empty? + sql = <<~SQL MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( - SELECT #{insert.send(:columns_list)}, #{conflict_columns.map.with_index do |group_of_conflicted_columns, index| - <<~PARTITION_BY - ROW_NUMBER() OVER ( - PARTITION BY #{group_of_conflicted_columns.map { |column| quote_column_name(column) }.join(",")} - ORDER BY #{group_of_conflicted_columns.map { |column| "#{quote_column_name(column)} DESC" }.join(",")} - ) AS rn_#{index} - PARTITION_BY - end.join(", ") - } + SELECT #{insert.send(:columns_list)}, #{partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)} FROM (#{insert.values_list}) AS t1 (#{insert.send(:columns_list)}) ) AS ranked_source - WHERE #{conflict_columns.map.with_index do |group_of_conflicted_columns, index| - "rn_#{index} = 1" - end.join(" AND ") - } + WHERE #{is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)} ) AS source - ON (#{conflict_columns.map do |columns| - columns.map do |column| - "target.#{quote_column_name(column)} = source.#{quote_column_name(column)}" - end.join(" AND ") - end.join(") OR (")}) + ON (#{joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)}) SQL if insert.update_duplicates? @@ -206,11 +182,7 @@ def build_insert_sql(insert) # :nodoc: sql << insert.raw_update_sql else if insert.record_timestamps? - sql << insert.model.timestamp_attributes_for_update_in_model.filter_map do |column_name| - if insert.send(:touch_timestamp_attribute?, column_name) - "target.#{quote_column_name(column_name)}=CASE WHEN (#{insert.updatable_columns.map { |column| "(COALESCE(target.#{quote_column_name(column)}, 'NULL') = COALESCE(source.#{quote_column_name(column)}, 'NULL'))" }.join(" AND ")}) THEN target.#{quote_column_name(column_name)} ELSE #{high_precision_current_timestamp} END," - end - end.join + sql << build_sql_for_recording_timestamps_when_updating(insert:) end sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",") @@ -228,48 +200,6 @@ def build_insert_sql(insert) # :nodoc: build_sql_for_regular_insert(insert) end - def build_sql_for_returning(insert:, insert_all:) - return "" unless insert_all.returning - - returning_values_sql = if insert_all.returning.is_a?(String) - insert_all.returning - else - Array(insert_all.returning).map do |attribute| - if insert.model.attribute_alias?(attribute) - "INSERTED.#{quote_column_name(insert.model.attribute_alias(attribute))} AS #{quote_column_name(attribute)}" - else - "INSERTED.#{quote_column_name(attribute)}" - end - end.join(",") - end - - " OUTPUT #{returning_values_sql}" - end - private :build_sql_for_returning - - def get_conflicted_columns(insert_all:, insert:) - if (unique_by = insert_all.unique_by) - [unique_by.columns] - else - # Compare against every unique constraint (primary key included). - # Discard constraints that are not fully included on insert.keys. Prevents invalid queries. - # Example: ignore unique index for columns ["name"] if insert keys is ["description"] - (insert_all.send(:unique_indexes).map(&:columns) + [insert_all.primary_keys]).select do |columns| - columns.to_set.subset?(insert.keys) - end - end - end - private :get_conflicted_columns - - def build_sql_for_regular_insert(insert) - sql = "INSERT #{insert.into}" - - sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all)) - sql << " #{insert.values_list}" - sql - end - private :build_sql_for_regular_insert - # === SQLServer Specific ======================================== # def execute_procedure(proc_name, *variables) @@ -571,6 +501,98 @@ def internal_raw_execute(sql, raw_connection, perform_do: false) result = raw_connection.execute(sql) perform_do ? result.do : result end + + # === SQLServer Specific (insert_all / upsert_all support) ===================== # + def build_sql_for_returning(insert:, insert_all:) + return "" unless insert_all.returning + + returning_values_sql = if insert_all.returning.is_a?(String) + insert_all.returning + else + Array(insert_all.returning).map do |attribute| + if insert.model.attribute_alias?(attribute) + "INSERTED.#{quote_column_name(insert.model.attribute_alias(attribute))} AS #{quote_column_name(attribute)}" + else + "INSERTED.#{quote_column_name(attribute)}" + end + end.join(",") + end + + " OUTPUT #{returning_values_sql}" + end + private :build_sql_for_returning + + def get_columns_with_uniqueness_constraints(insert_all:, insert:) + if (unique_by = insert_all.unique_by) + [unique_by.columns] + else + # Compare against every unique constraint (primary key included). + # Discard constraints that are not fully included on insert.keys. Prevents invalid queries. + # Example: ignore unique index for columns ["name"] if insert keys is ["description"] + (insert_all.send(:unique_indexes).map(&:columns) + [insert_all.primary_keys]).select do |columns| + columns.to_set.subset?(insert.keys) + end + end + end + private :get_columns_with_uniqueness_constraints + + def build_sql_for_regular_insert(insert) + sql = "INSERT #{insert.into}" + + sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all)) + + sql << " #{insert.values_list}" + sql + end + private :build_sql_for_regular_insert + + # why is the "PARTITION BY" clause needed? + # in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens + # when duplicates are found (SKIP OR UPDATE) + # by default rows are considered to be unique by every unique index on the table + # but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves + # otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables + # this works easiest by using PARTITION and make sure that any record + # we are trying to insert is "the first one seen across all the potential columns with uniquness constraints" + def partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:) + columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index| + <<~PARTITION_BY + ROW_NUMBER() OVER ( + PARTITION BY #{group_of_columns_with_uniqueness_constraints.map { |column| quote_column_name(column) }.join(",")} + ORDER BY #{group_of_columns_with_uniqueness_constraints.map { |column| "#{quote_column_name(column)} DESC" }.join(",")} + ) AS rn_#{index} + PARTITION_BY + end.join(", ") + end + private :partition_by_columns_with_uniqueness_constraints + + def is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:) + columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index| + "rn_#{index} = 1" + end.join(" AND ") + end + private :is_first_record_across_all_uniqueness_constraints + + def joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:) + columns_with_uniqueness_constraints.map do |columns| + columns.map do |column| + "target.#{quote_column_name(column)} = source.#{quote_column_name(column)}" + end.join(" AND ") + end.join(") OR (") + end + private :joining_on_columns_with_uniqueness_constraints + + # normally, generating the CASE SQL is done entirely by Rails + # and you would just hook into "touch_model_timestamps_unless" to add your database-specific instructions + # however, since we need to have "target." for the assignment, we also generate the CASE switch ourselves + def build_sql_for_recording_timestamps_when_updating(insert:) + insert.model.timestamp_attributes_for_update_in_model.filter_map do |column_name| + if insert.send(:touch_timestamp_attribute?, column_name) + "target.#{quote_column_name(column_name)}=CASE WHEN (#{insert.updatable_columns.map { |column| "(COALESCE(target.#{quote_column_name(column)}, 'NULL') = COALESCE(source.#{quote_column_name(column)}, 'NULL'))" }.join(" AND ")}) THEN target.#{quote_column_name(column_name)} ELSE #{high_precision_current_timestamp} END," + end + end.join + end + private :build_sql_for_recording_timestamps_when_updating end end end From 23020a97731c32449294033d5679e9f79f614cbf Mon Sep 17 00:00:00 2001 From: Andy Pfister Date: Sun, 16 Mar 2025 17:01:05 +0100 Subject: [PATCH 5/8] Describe `insert_all` / `upsert_all` support --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index f6062772d..294bcd990 100644 --- a/README.md +++ b/README.md @@ -169,6 +169,22 @@ ActiveRecord::ConnectionAdapters::SQLServerAdapter.showplan_option = 'SHOWPLAN_X ``` **NOTE:** The method we utilize to make SHOWPLANs work is very brittle to complex SQL. There is no getting around this as we have to deconstruct an already prepared statement for the sp_executesql method. If you find that explain breaks your app, simple disable it. Do not open a github issue unless you have a patch. Please [consult the Rails guides](http://guides.rubyonrails.org/active_record_querying.html#running-explain) for more info. +#### `insert_all` / `upsert_all` support + +`insert_all` and `upsert_all` on other database system like MySQL, SQlite or PostgreSQL use a clause with their `INSERT` statement to either skip duplicates (`ON DUPLICATE KEY IGNORE`) or to update the existing record (`ON DUPLICATE KEY UPDATE`). Microsoft SQL Server does not offer these clauses, so the support for these two options is implemented slightly different. + +Behind the scenes, we execute a `MERGE` query, which joins your data that you want to insert or update into the table existing on the server. The emphasis here is "JOINING", so we also need to remove any duplicates that might make the `JOIN` operation fail, e.g. something like this: + +```ruby +Book.insert_all [ + { id: 200, author_id: 8, name: "Refactoring" }, + { id: 200, author_id: 8, name: "Refactoring" } +] +``` + +The removal of duplicates happens during the SQL query. + +Because of this implementation, if you pass `on_duplicate` to `upsert_all`, make sure to assign your value to `target.[column_name]` (e.g. `target.status = GREATEST(target.status, 1)`). To access the values that you want to upsert, use `source.[column_name]`. ## New Rails Applications From d68c566b7fb1ece22ad672b7323e7e4852a110e3 Mon Sep 17 00:00:00 2001 From: Andy Pfister Date: Wed, 19 Mar 2025 17:14:22 +0100 Subject: [PATCH 6/8] Replace `GREATEST` with `CASE` --- test/cases/coerced_tests.rb | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/cases/coerced_tests.rb b/test/cases/coerced_tests.rb index 01e26ab6f..1cdd9eff8 100644 --- a/test/cases/coerced_tests.rb +++ b/test/cases/coerced_tests.rb @@ -2553,12 +2553,18 @@ def test_insert_with_type_casting_and_serialize_is_consistent_coerced Book.lease_connection.add_index(:books, [:author_id, :name], unique: true) end - # Same as original but using target.status for assignment and GREATEST for operator + # Same as original but using target.status for assignment and CASE instead of GREATEST for operator coerce_tests! :test_upsert_all_updates_using_provided_sql def test_upsert_all_updates_using_provided_sql_coerced Book.upsert_all( [{id: 1, status: 1}, {id: 2, status: 1}], - on_duplicate: Arel.sql("target.status = GREATEST(target.status, 1)") + on_duplicate: Arel.sql(<<~SQL + target.status = CASE + WHEN target.status > 1 THEN target.status + ELSE 1 + END + SQL + ) ) assert_equal "published", Book.find(1).status From e092ebb13a896dc4cadcb77d5ef7a5ffb9388af4 Mon Sep 17 00:00:00 2001 From: Andy Pfister Date: Wed, 19 Mar 2025 17:26:40 +0100 Subject: [PATCH 7/8] Add Changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c8b5e367..a4884550c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ #### Added - [#1301](https://github.com/rails-sqlserver/activerecord-sqlserver-adapter/pull/1301) Add support for INDEX INCLUDE. +- [#1312](https://github.com/rails-sqlserver/activerecord-sqlserver-adapter/pull/1312) Add support for `insert_all` and `upsert_all` #### Changed From db8199a1af173e5035650c7ff88bb3233e5ef423 Mon Sep 17 00:00:00 2001 From: Aidan Haran Date: Thu, 20 Mar 2025 13:33:36 +0000 Subject: [PATCH 8/8] Small refactor of Support insert_all and upsert_all using MERGE (#1314) --- .../sqlserver/database_statements.rb | 88 ++++++++++--------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/lib/active_record/connection_adapters/sqlserver/database_statements.rb b/lib/active_record/connection_adapters/sqlserver/database_statements.rb index 2e466c664..92e91665d 100644 --- a/lib/active_record/connection_adapters/sqlserver/database_statements.rb +++ b/lib/active_record/connection_adapters/sqlserver/database_statements.rb @@ -154,50 +154,54 @@ def default_insert_value(column) private :default_insert_value def build_insert_sql(insert) # :nodoc: - if insert.skip_duplicates? || insert.update_duplicates? - insert_all = insert.send(:insert_all) - columns_with_uniqueness_constraints = get_columns_with_uniqueness_constraints(insert_all:, insert:) - - # if we do not have any columns that might have conflicting values, just execute a regular insert - return build_sql_for_regular_insert(insert) if columns_with_uniqueness_constraints.flatten.empty? - - sql = <<~SQL - MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target - USING ( - SELECT * - FROM ( - SELECT #{insert.send(:columns_list)}, #{partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)} - FROM (#{insert.values_list}) - AS t1 (#{insert.send(:columns_list)}) - ) AS ranked_source - WHERE #{is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)} - ) AS source - ON (#{joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)}) - SQL - - if insert.update_duplicates? - sql << " WHEN MATCHED THEN UPDATE SET " - - if insert.raw_update_sql? - sql << insert.raw_update_sql - else - if insert.record_timestamps? - sql << build_sql_for_recording_timestamps_when_updating(insert:) - end + # Use regular insert if not skipping/updating duplicates. + return build_sql_for_regular_insert(insert:) unless insert.skip_duplicates? || insert.update_duplicates? - sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",") - end - end - sql << " WHEN NOT MATCHED BY TARGET THEN" - sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})" - sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all)) + insert_all = insert.send(:insert_all) + columns_with_uniqueness_constraints = get_columns_with_uniqueness_constraints(insert_all:, insert:) + + # If we do not have any columns that might have conflicting values just execute a regular insert, else use merge. + if columns_with_uniqueness_constraints.flatten.empty? + build_sql_for_regular_insert(insert:) + else + build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:) + end + end - sql << ";" + def build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:) # :nodoc: + sql = <<~SQL + MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target + USING ( + SELECT * + FROM ( + SELECT #{insert.send(:columns_list)}, #{partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)} + FROM (#{insert.values_list}) + AS t1 (#{insert.send(:columns_list)}) + ) AS ranked_source + WHERE #{is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)} + ) AS source + ON (#{joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)}) + SQL + + if insert.update_duplicates? + sql << " WHEN MATCHED THEN UPDATE SET " + + if insert.raw_update_sql? + sql << insert.raw_update_sql + else + if insert.record_timestamps? + sql << build_sql_for_recording_timestamps_when_updating(insert:) + end - return sql + sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",") + end end + sql << " WHEN NOT MATCHED BY TARGET THEN" + sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})" + sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all)) + sql << ";" - build_sql_for_regular_insert(insert) + sql end # === SQLServer Specific ======================================== # @@ -536,11 +540,9 @@ def get_columns_with_uniqueness_constraints(insert_all:, insert:) end private :get_columns_with_uniqueness_constraints - def build_sql_for_regular_insert(insert) + def build_sql_for_regular_insert(insert:) sql = "INSERT #{insert.into}" - sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all)) - sql << " #{insert.values_list}" sql end @@ -553,7 +555,7 @@ def build_sql_for_regular_insert(insert) # but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves # otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables # this works easiest by using PARTITION and make sure that any record - # we are trying to insert is "the first one seen across all the potential columns with uniquness constraints" + # we are trying to insert is "the first one seen across all the potential columns with uniqueness constraints" def partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:) columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index| <<~PARTITION_BY