Skip to content

Commit 2728956

Browse files
authored
Support insert_all and upsert_all using MERGE (#1315)
1 parent 0e7e24f commit 2728956

File tree

8 files changed

+234
-59
lines changed

8 files changed

+234
-59
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## Unreleased
2+
3+
#### Added
4+
5+
- [#1315](https://github.com/rails-sqlserver/activerecord-sqlserver-adapter/pull/1315) Add support for `insert_all` and `upsert_all`
6+
17
## v8.0.4
28

39
#### Fixed

README.md

+16
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,22 @@ ActiveRecord::ConnectionAdapters::SQLServerAdapter.showplan_option = 'SHOWPLAN_X
168168
```
169169
**NOTE:** The method we utilize to make SHOWPLANs work is very brittle to complex SQL. There is no getting around this as we have to deconstruct an already prepared statement for the sp_executesql method. If you find that explain breaks your app, simple disable it. Do not open a github issue unless you have a patch. Please [consult the Rails guides](http://guides.rubyonrails.org/active_record_querying.html#running-explain) for more info.
170170

171+
#### `insert_all` / `upsert_all` support
172+
173+
`insert_all` and `upsert_all` on other database system like MySQL, SQlite or PostgreSQL use a clause with their `INSERT` statement to either skip duplicates (`ON DUPLICATE KEY IGNORE`) or to update the existing record (`ON DUPLICATE KEY UPDATE`). Microsoft SQL Server does not offer these clauses, so the support for these two options is implemented slightly different.
174+
175+
Behind the scenes, we execute a `MERGE` query, which joins your data that you want to insert or update into the table existing on the server. The emphasis here is "JOINING", so we also need to remove any duplicates that might make the `JOIN` operation fail, e.g. something like this:
176+
177+
```ruby
178+
Book.insert_all [
179+
{ id: 200, author_id: 8, name: "Refactoring" },
180+
{ id: 200, author_id: 8, name: "Refactoring" }
181+
]
182+
```
183+
184+
The removal of duplicates happens during the SQL query.
185+
186+
Because of this implementation, if you pass `on_duplicate` to `upsert_all`, make sure to assign your value to `target.[column_name]` (e.g. `target.status = GREATEST(target.status, 1)`). To access the values that you want to upsert, use `source.[column_name]`.
171187

172188
## New Rails Applications
173189

lib/active_record/connection_adapters/sqlserver/database_statements.rb

+145-12
Original file line numberDiff line numberDiff line change
@@ -143,18 +143,54 @@ def default_insert_value(column)
143143
private :default_insert_value
144144

145145
def build_insert_sql(insert) # :nodoc:
146-
sql = +"INSERT #{insert.into}"
147-
148-
if returning = insert.send(:insert_all).returning
149-
returning_sql = if returning.is_a?(String)
150-
returning
151-
else
152-
Array(returning).map { |column| "INSERTED.#{quote_column_name(column)}" }.join(", ")
153-
end
154-
sql << " OUTPUT #{returning_sql}"
146+
# Use regular insert if not skipping/updating duplicates.
147+
return build_sql_for_regular_insert(insert:) unless insert.skip_duplicates? || insert.update_duplicates?
148+
149+
insert_all = insert.send(:insert_all)
150+
columns_with_uniqueness_constraints = get_columns_with_uniqueness_constraints(insert_all:, insert:)
151+
152+
# If we do not have any columns that might have conflicting values just execute a regular insert, else use merge.
153+
if columns_with_uniqueness_constraints.flatten.empty?
154+
build_sql_for_regular_insert(insert:)
155+
else
156+
build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:)
155157
end
158+
end
159+
160+
161+
def build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:) # :nodoc:
162+
sql = <<~SQL
163+
MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target
164+
USING (
165+
SELECT *
166+
FROM (
167+
SELECT #{insert.send(:columns_list)}, #{partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)}
168+
FROM (#{insert.values_list})
169+
AS t1 (#{insert.send(:columns_list)})
170+
) AS ranked_source
171+
WHERE #{is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)}
172+
) AS source
173+
ON (#{joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)})
174+
SQL
175+
176+
if insert.update_duplicates?
177+
sql << " WHEN MATCHED THEN UPDATE SET "
178+
179+
if insert.raw_update_sql?
180+
sql << insert.raw_update_sql
181+
else
182+
if insert.record_timestamps?
183+
sql << build_sql_for_recording_timestamps_when_updating(insert:)
184+
end
185+
186+
sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",")
187+
end
188+
end
189+
sql << " WHEN NOT MATCHED BY TARGET THEN"
190+
sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})"
191+
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
192+
sql << ";"
156193

157-
sql << " #{insert.values_list}"
158194
sql
159195
end
160196

@@ -406,11 +442,18 @@ def query_requires_identity_insert?(sql)
406442
raw_table_name = get_raw_table_name(sql)
407443
id_column = identity_columns(raw_table_name).first
408444

409-
id_column && sql =~ /^\s*(INSERT|EXEC sp_executesql N'INSERT)[^(]+\([^)]*\b(#{id_column.name})\b,?[^)]*\)/i ? SQLServer::Utils.extract_identifiers(raw_table_name).quoted : false
445+
if id_column && (
446+
sql =~ /^\s*(INSERT|EXEC sp_executesql N'INSERT)[^(]+\([^)]*\b(#{id_column.name})\b,?[^)]*\)/i ||
447+
sql =~ /^\s*MERGE INTO.+THEN INSERT \([^)]*\b(#{id_column.name})\b,?[^)]*\)/im
448+
)
449+
SQLServer::Utils.extract_identifiers(raw_table_name).quoted
450+
else
451+
false
452+
end
410453
end
411454

412455
def insert_sql?(sql)
413-
!(sql =~ /\A\s*(INSERT|EXEC sp_executesql N'INSERT)/i).nil?
456+
!(sql =~ /\A\s*(INSERT|EXEC sp_executesql N'INSERT|MERGE INTO.+THEN INSERT)/im).nil?
414457
end
415458

416459
def identity_columns(table_name)
@@ -455,6 +498,96 @@ def internal_raw_execute(sql, raw_connection, perform_do: false)
455498

456499
perform_do ? result.do : result
457500
end
501+
502+
# === SQLServer Specific (insert_all / upsert_all support) ===================== #
503+
def build_sql_for_returning(insert:, insert_all:)
504+
return "" unless insert_all.returning
505+
506+
returning_values_sql = if insert_all.returning.is_a?(String)
507+
insert_all.returning
508+
else
509+
Array(insert_all.returning).map do |attribute|
510+
if insert.model.attribute_alias?(attribute)
511+
"INSERTED.#{quote_column_name(insert.model.attribute_alias(attribute))} AS #{quote_column_name(attribute)}"
512+
else
513+
"INSERTED.#{quote_column_name(attribute)}"
514+
end
515+
end.join(",")
516+
end
517+
518+
" OUTPUT #{returning_values_sql}"
519+
end
520+
private :build_sql_for_returning
521+
522+
def get_columns_with_uniqueness_constraints(insert_all:, insert:)
523+
if (unique_by = insert_all.unique_by)
524+
[unique_by.columns]
525+
else
526+
# Compare against every unique constraint (primary key included).
527+
# Discard constraints that are not fully included on insert.keys. Prevents invalid queries.
528+
# Example: ignore unique index for columns ["name"] if insert keys is ["description"]
529+
(insert_all.send(:unique_indexes).map(&:columns) + [insert_all.primary_keys]).select do |columns|
530+
columns.to_set.subset?(insert.keys)
531+
end
532+
end
533+
end
534+
private :get_columns_with_uniqueness_constraints
535+
536+
def build_sql_for_regular_insert(insert:)
537+
sql = "INSERT #{insert.into}"
538+
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
539+
sql << " #{insert.values_list}"
540+
sql
541+
end
542+
private :build_sql_for_regular_insert
543+
544+
# why is the "PARTITION BY" clause needed?
545+
# in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens
546+
# when duplicates are found (SKIP OR UPDATE)
547+
# by default rows are considered to be unique by every unique index on the table
548+
# but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves
549+
# otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables
550+
# this works easiest by using PARTITION and make sure that any record
551+
# we are trying to insert is "the first one seen across all the potential columns with uniqueness constraints"
552+
def partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)
553+
columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index|
554+
<<~PARTITION_BY
555+
ROW_NUMBER() OVER (
556+
PARTITION BY #{group_of_columns_with_uniqueness_constraints.map { |column| quote_column_name(column) }.join(",")}
557+
ORDER BY #{group_of_columns_with_uniqueness_constraints.map { |column| "#{quote_column_name(column)} DESC" }.join(",")}
558+
) AS rn_#{index}
559+
PARTITION_BY
560+
end.join(", ")
561+
end
562+
private :partition_by_columns_with_uniqueness_constraints
563+
564+
def is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)
565+
columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index|
566+
"rn_#{index} = 1"
567+
end.join(" AND ")
568+
end
569+
private :is_first_record_across_all_uniqueness_constraints
570+
571+
def joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)
572+
columns_with_uniqueness_constraints.map do |columns|
573+
columns.map do |column|
574+
"target.#{quote_column_name(column)} = source.#{quote_column_name(column)}"
575+
end.join(" AND ")
576+
end.join(") OR (")
577+
end
578+
private :joining_on_columns_with_uniqueness_constraints
579+
580+
# normally, generating the CASE SQL is done entirely by Rails
581+
# and you would just hook into "touch_model_timestamps_unless" to add your database-specific instructions
582+
# however, since we need to have "target." for the assignment, we also generate the CASE switch ourselves
583+
def build_sql_for_recording_timestamps_when_updating(insert:)
584+
insert.model.timestamp_attributes_for_update_in_model.filter_map do |column_name|
585+
if insert.send(:touch_timestamp_attribute?, column_name)
586+
"target.#{quote_column_name(column_name)}=CASE WHEN (#{insert.updatable_columns.map { |column| "(COALESCE(target.#{quote_column_name(column)}, 'NULL') = COALESCE(source.#{quote_column_name(column)}, 'NULL'))" }.join(" AND ")}) THEN target.#{quote_column_name(column_name)} ELSE #{high_precision_current_timestamp} END,"
587+
end
588+
end.join
589+
end
590+
private :build_sql_for_recording_timestamps_when_updating
458591
end
459592
end
460593
end

lib/active_record/connection_adapters/sqlserver/schema_statements.rb

+2
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,8 @@ def get_raw_table_name(sql)
720720
.match(/\s*([^(]*)/i)[0]
721721
elsif s.match?(/^\s*UPDATE\s+.*/i)
722722
s.match(/UPDATE\s+([^\(\s]+)\s*/i)[1]
723+
elsif s.match?(/^\s*MERGE INTO.*/i)
724+
s.match(/^\s*MERGE\s+INTO\s+(\[?[a-z_ -]+\]?\.?\[?[a-z_ -]+\]?)\s+(AS|WITH|USING)/i)[1]
723725
else
724726
s.match(/FROM[\s|\(]+((\[[^\(\]]+\])|[^\(\s]+)\s*/i)[1]
725727
end.strip

lib/active_record/connection_adapters/sqlserver_adapter.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -212,11 +212,11 @@ def supports_insert_returning?
212212
end
213213

214214
def supports_insert_on_duplicate_skip?
215-
false
215+
true
216216
end
217217

218218
def supports_insert_on_duplicate_update?
219-
false
219+
true
220220
end
221221

222222
def supports_insert_conflict_target?

test/cases/adapter_test_sqlserver.rb

+11-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
1313
fixtures :tasks
1414

1515
let(:basic_insert_sql) { "INSERT INTO [funny_jokes] ([name]) VALUES('Knock knock')" }
16+
let(:basic_merge_sql) { "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [id], [name], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([id], [name]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name]" }
1617
let(:basic_update_sql) { "UPDATE [customers] SET [address_street] = NULL WHERE [id] = 2" }
1718
let(:basic_select_sql) { "SELECT * FROM [customers] WHERE ([customers].[id] = 1)" }
1819

@@ -93,6 +94,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
9394

9495
it "return unquoted table name object from basic INSERT UPDATE and SELECT statements" do
9596
assert_equal "funny_jokes", connection.send(:get_table_name, basic_insert_sql)
97+
assert_equal "ships", connection.send(:get_table_name, basic_merge_sql)
9698
assert_equal "customers", connection.send(:get_table_name, basic_update_sql)
9799
assert_equal "customers", connection.send(:get_table_name, basic_select_sql)
98100
end
@@ -213,6 +215,10 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
213215
@identity_insert_sql_unquoted_sp = "EXEC sp_executesql N'INSERT INTO funny_jokes (id, name) VALUES (@0, @1)', N'@0 int, @1 nvarchar(255)', @0 = 420, @1 = N'Knock knock'"
214216
@identity_insert_sql_unordered_sp = "EXEC sp_executesql N'INSERT INTO [funny_jokes] ([name],[id]) VALUES (@0, @1)', N'@0 nvarchar(255), @1 int', @0 = N'Knock knock', @1 = 420"
215217

218+
@identity_merge_sql = "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [id], [name], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([id], [name]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name] WHEN NOT MATCHED BY TARGET THEN INSERT ([id], [name]) VALUES (source.[id], source.[name]) OUTPUT INSERTED.[id]"
219+
@identity_merge_sql_unquoted = "MERGE INTO ships WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT id, name, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY id DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 (id, name) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.id = source.id) WHEN MATCHED THEN UPDATE SET target.name = source.name WHEN NOT MATCHED BY TARGET THEN INSERT (id, name) VALUES (source.id, source.name) OUTPUT INSERTED.id"
220+
@identity_merge_sql_unordered = "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [name], [id], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([name], [id]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name] WHEN NOT MATCHED BY TARGET THEN INSERT ([name], [id]) VALUES (source.[name], source.[id]) OUTPUT INSERTED.[id]"
221+
216222
@identity_insert_sql_non_dbo = "INSERT INTO [test].[aliens] ([id],[name]) VALUES(420,'Mork')"
217223
@identity_insert_sql_non_dbo_unquoted = "INSERT INTO test.aliens ([id],[name]) VALUES(420,'Mork')"
218224
@identity_insert_sql_non_dbo_unordered = "INSERT INTO [test].[aliens] ([name],[id]) VALUES('Mork',420)"
@@ -229,6 +235,10 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
229235
assert_equal "[funny_jokes]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_unquoted_sp)
230236
assert_equal "[funny_jokes]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_unordered_sp)
231237

238+
assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql)
239+
assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql_unquoted)
240+
assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql_unordered)
241+
232242
assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo)
233243
assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo_unquoted)
234244
assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo_unordered)
@@ -238,7 +248,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
238248
end
239249

240250
it "return false to #query_requires_identity_insert? for normal SQL" do
241-
[basic_insert_sql, basic_update_sql, basic_select_sql].each do |sql|
251+
[basic_insert_sql, basic_merge_sql, basic_update_sql, basic_select_sql].each do |sql|
242252
assert !connection.send(:query_requires_identity_insert?, sql), "SQL was #{sql}"
243253
end
244254
end

0 commit comments

Comments
 (0)