Skip to content

Commit aa0d8ef

Browse files
authored
Support insert_all and upsert_all using MERGE (#1312)
1 parent 8739eea commit aa0d8ef

File tree

8 files changed

+208
-13
lines changed

8 files changed

+208
-13
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#### Added
44

55
- [#1301](https://github.com/rails-sqlserver/activerecord-sqlserver-adapter/pull/1301) Add support for INDEX INCLUDE.
6+
- [#1312](https://github.com/rails-sqlserver/activerecord-sqlserver-adapter/pull/1312) Add support for `insert_all` and `upsert_all`
67

78
#### Changed
89

README.md

+16
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,22 @@ ActiveRecord::ConnectionAdapters::SQLServerAdapter.showplan_option = 'SHOWPLAN_X
169169
```
170170
**NOTE:** The method we utilize to make SHOWPLANs work is very brittle to complex SQL. There is no getting around this as we have to deconstruct an already prepared statement for the sp_executesql method. If you find that explain breaks your app, simple disable it. Do not open a github issue unless you have a patch. Please [consult the Rails guides](http://guides.rubyonrails.org/active_record_querying.html#running-explain) for more info.
171171

172+
#### `insert_all` / `upsert_all` support
173+
174+
`insert_all` and `upsert_all` on other database system like MySQL, SQlite or PostgreSQL use a clause with their `INSERT` statement to either skip duplicates (`ON DUPLICATE KEY IGNORE`) or to update the existing record (`ON DUPLICATE KEY UPDATE`). Microsoft SQL Server does not offer these clauses, so the support for these two options is implemented slightly different.
175+
176+
Behind the scenes, we execute a `MERGE` query, which joins your data that you want to insert or update into the table existing on the server. The emphasis here is "JOINING", so we also need to remove any duplicates that might make the `JOIN` operation fail, e.g. something like this:
177+
178+
```ruby
179+
Book.insert_all [
180+
{ id: 200, author_id: 8, name: "Refactoring" },
181+
{ id: 200, author_id: 8, name: "Refactoring" }
182+
]
183+
```
184+
185+
The removal of duplicates happens during the SQL query.
186+
187+
Because of this implementation, if you pass `on_duplicate` to `upsert_all`, make sure to assign your value to `target.[column_name]` (e.g. `target.status = GREATEST(target.status, 1)`). To access the values that you want to upsert, use `source.[column_name]`.
172188

173189
## New Rails Applications
174190

lib/active_record/connection_adapters/sqlserver/database_statements.rb

+140-10
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,53 @@ def default_insert_value(column)
154154
private :default_insert_value
155155

156156
def build_insert_sql(insert) # :nodoc:
157-
sql = "INSERT #{insert.into}"
157+
# Use regular insert if not skipping/updating duplicates.
158+
return build_sql_for_regular_insert(insert:) unless insert.skip_duplicates? || insert.update_duplicates?
159+
160+
insert_all = insert.send(:insert_all)
161+
columns_with_uniqueness_constraints = get_columns_with_uniqueness_constraints(insert_all:, insert:)
158162

159-
returning = insert.send(:insert_all).returning
163+
# If we do not have any columns that might have conflicting values just execute a regular insert, else use merge.
164+
if columns_with_uniqueness_constraints.flatten.empty?
165+
build_sql_for_regular_insert(insert:)
166+
else
167+
build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:)
168+
end
169+
end
160170

161-
if returning
162-
returning_sql = if returning.is_a?(String)
163-
returning
171+
def build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:) # :nodoc:
172+
sql = <<~SQL
173+
MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target
174+
USING (
175+
SELECT *
176+
FROM (
177+
SELECT #{insert.send(:columns_list)}, #{partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)}
178+
FROM (#{insert.values_list})
179+
AS t1 (#{insert.send(:columns_list)})
180+
) AS ranked_source
181+
WHERE #{is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)}
182+
) AS source
183+
ON (#{joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)})
184+
SQL
185+
186+
if insert.update_duplicates?
187+
sql << " WHEN MATCHED THEN UPDATE SET "
188+
189+
if insert.raw_update_sql?
190+
sql << insert.raw_update_sql
164191
else
165-
Array(returning).map { |column| "INSERTED.#{quote_column_name(column)}" }.join(", ")
192+
if insert.record_timestamps?
193+
sql << build_sql_for_recording_timestamps_when_updating(insert:)
194+
end
195+
196+
sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",")
166197
end
167-
sql << " OUTPUT #{returning_sql}"
168198
end
199+
sql << " WHEN NOT MATCHED BY TARGET THEN"
200+
sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})"
201+
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
202+
sql << ";"
169203

170-
sql << " #{insert.values_list}"
171204
sql
172205
end
173206

@@ -418,11 +451,18 @@ def query_requires_identity_insert?(sql)
418451
raw_table_name = get_raw_table_name(sql)
419452
id_column = identity_columns(raw_table_name).first
420453

421-
(id_column && sql =~ /^\s*(INSERT|EXEC sp_executesql N'INSERT)[^(]+\([^)]*\b(#{id_column.name})\b,?[^)]*\)/i) ? SQLServer::Utils.extract_identifiers(raw_table_name).quoted : false
454+
if id_column && (
455+
sql =~ /^\s*(INSERT|EXEC sp_executesql N'INSERT)[^(]+\([^)]*\b(#{id_column.name})\b,?[^)]*\)/i ||
456+
sql =~ /^\s*MERGE INTO.+THEN INSERT \([^)]*\b(#{id_column.name})\b,?[^)]*\)/im
457+
)
458+
SQLServer::Utils.extract_identifiers(raw_table_name).quoted
459+
else
460+
false
461+
end
422462
end
423463

424464
def insert_sql?(sql)
425-
!(sql =~ /\A\s*(INSERT|EXEC sp_executesql N'INSERT)/i).nil?
465+
!(sql =~ /\A\s*(INSERT|EXEC sp_executesql N'INSERT|MERGE INTO.+THEN INSERT)/im).nil?
426466
end
427467

428468
def identity_columns(table_name)
@@ -465,6 +505,96 @@ def internal_raw_execute(sql, raw_connection, perform_do: false)
465505
result = raw_connection.execute(sql)
466506
perform_do ? result.do : result
467507
end
508+
509+
# === SQLServer Specific (insert_all / upsert_all support) ===================== #
510+
def build_sql_for_returning(insert:, insert_all:)
511+
return "" unless insert_all.returning
512+
513+
returning_values_sql = if insert_all.returning.is_a?(String)
514+
insert_all.returning
515+
else
516+
Array(insert_all.returning).map do |attribute|
517+
if insert.model.attribute_alias?(attribute)
518+
"INSERTED.#{quote_column_name(insert.model.attribute_alias(attribute))} AS #{quote_column_name(attribute)}"
519+
else
520+
"INSERTED.#{quote_column_name(attribute)}"
521+
end
522+
end.join(",")
523+
end
524+
525+
" OUTPUT #{returning_values_sql}"
526+
end
527+
private :build_sql_for_returning
528+
529+
def get_columns_with_uniqueness_constraints(insert_all:, insert:)
530+
if (unique_by = insert_all.unique_by)
531+
[unique_by.columns]
532+
else
533+
# Compare against every unique constraint (primary key included).
534+
# Discard constraints that are not fully included on insert.keys. Prevents invalid queries.
535+
# Example: ignore unique index for columns ["name"] if insert keys is ["description"]
536+
(insert_all.send(:unique_indexes).map(&:columns) + [insert_all.primary_keys]).select do |columns|
537+
columns.to_set.subset?(insert.keys)
538+
end
539+
end
540+
end
541+
private :get_columns_with_uniqueness_constraints
542+
543+
def build_sql_for_regular_insert(insert:)
544+
sql = "INSERT #{insert.into}"
545+
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
546+
sql << " #{insert.values_list}"
547+
sql
548+
end
549+
private :build_sql_for_regular_insert
550+
551+
# why is the "PARTITION BY" clause needed?
552+
# in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens
553+
# when duplicates are found (SKIP OR UPDATE)
554+
# by default rows are considered to be unique by every unique index on the table
555+
# but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves
556+
# otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables
557+
# this works easiest by using PARTITION and make sure that any record
558+
# we are trying to insert is "the first one seen across all the potential columns with uniqueness constraints"
559+
def partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)
560+
columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index|
561+
<<~PARTITION_BY
562+
ROW_NUMBER() OVER (
563+
PARTITION BY #{group_of_columns_with_uniqueness_constraints.map { |column| quote_column_name(column) }.join(",")}
564+
ORDER BY #{group_of_columns_with_uniqueness_constraints.map { |column| "#{quote_column_name(column)} DESC" }.join(",")}
565+
) AS rn_#{index}
566+
PARTITION_BY
567+
end.join(", ")
568+
end
569+
private :partition_by_columns_with_uniqueness_constraints
570+
571+
def is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)
572+
columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index|
573+
"rn_#{index} = 1"
574+
end.join(" AND ")
575+
end
576+
private :is_first_record_across_all_uniqueness_constraints
577+
578+
def joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)
579+
columns_with_uniqueness_constraints.map do |columns|
580+
columns.map do |column|
581+
"target.#{quote_column_name(column)} = source.#{quote_column_name(column)}"
582+
end.join(" AND ")
583+
end.join(") OR (")
584+
end
585+
private :joining_on_columns_with_uniqueness_constraints
586+
587+
# normally, generating the CASE SQL is done entirely by Rails
588+
# and you would just hook into "touch_model_timestamps_unless" to add your database-specific instructions
589+
# however, since we need to have "target." for the assignment, we also generate the CASE switch ourselves
590+
def build_sql_for_recording_timestamps_when_updating(insert:)
591+
insert.model.timestamp_attributes_for_update_in_model.filter_map do |column_name|
592+
if insert.send(:touch_timestamp_attribute?, column_name)
593+
"target.#{quote_column_name(column_name)}=CASE WHEN (#{insert.updatable_columns.map { |column| "(COALESCE(target.#{quote_column_name(column)}, 'NULL') = COALESCE(source.#{quote_column_name(column)}, 'NULL'))" }.join(" AND ")}) THEN target.#{quote_column_name(column_name)} ELSE #{high_precision_current_timestamp} END,"
594+
end
595+
end.join
596+
end
597+
private :build_sql_for_recording_timestamps_when_updating
468598
end
469599
end
470600
end

lib/active_record/connection_adapters/sqlserver/schema_statements.rb

+2
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,8 @@ def get_raw_table_name(sql)
759759
.match(/\s*([^(]*)/i)[0]
760760
elsif s.match?(/^\s*UPDATE\s+.*/i)
761761
s.match(/UPDATE\s+([^\(\s]+)\s*/i)[1]
762+
elsif s.match?(/^\s*MERGE INTO.*/i)
763+
s.match(/^\s*MERGE\s+INTO\s+(\[?[a-z_ -]+\]?\.?\[?[a-z_ -]+\]?)\s+(AS|WITH|USING)/i)[1]
762764
else
763765
s.match(/FROM[\s|\(]+((\[[^\(\]]+\])|[^\(\s]+)\s*/i)[1]
764766
end.strip

lib/active_record/connection_adapters/sqlserver_adapter.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,11 @@ def supports_insert_returning?
216216
end
217217

218218
def supports_insert_on_duplicate_skip?
219-
false
219+
true
220220
end
221221

222222
def supports_insert_on_duplicate_update?
223-
false
223+
true
224224
end
225225

226226
def supports_insert_conflict_target?

test/cases/adapter_test_sqlserver.rb

+11-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
1313
fixtures :tasks
1414

1515
let(:basic_insert_sql) { "INSERT INTO [funny_jokes] ([name]) VALUES('Knock knock')" }
16+
let(:basic_merge_sql) { "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [id], [name], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([id], [name]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name]" }
1617
let(:basic_update_sql) { "UPDATE [customers] SET [address_street] = NULL WHERE [id] = 2" }
1718
let(:basic_select_sql) { "SELECT * FROM [customers] WHERE ([customers].[id] = 1)" }
1819

@@ -91,6 +92,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
9192

9293
it "return unquoted table name object from basic INSERT UPDATE and SELECT statements" do
9394
assert_equal "funny_jokes", connection.send(:get_table_name, basic_insert_sql)
95+
assert_equal "ships", connection.send(:get_table_name, basic_merge_sql)
9496
assert_equal "customers", connection.send(:get_table_name, basic_update_sql)
9597
assert_equal "customers", connection.send(:get_table_name, basic_select_sql)
9698
end
@@ -219,6 +221,10 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
219221
@identity_insert_sql_unquoted_sp = "EXEC sp_executesql N'INSERT INTO funny_jokes (id, name) VALUES (@0, @1)', N'@0 int, @1 nvarchar(255)', @0 = 420, @1 = N'Knock knock'"
220222
@identity_insert_sql_unordered_sp = "EXEC sp_executesql N'INSERT INTO [funny_jokes] ([name],[id]) VALUES (@0, @1)', N'@0 nvarchar(255), @1 int', @0 = N'Knock knock', @1 = 420"
221223

224+
@identity_merge_sql = "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [id], [name], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([id], [name]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name] WHEN NOT MATCHED BY TARGET THEN INSERT ([id], [name]) VALUES (source.[id], source.[name]) OUTPUT INSERTED.[id]"
225+
@identity_merge_sql_unquoted = "MERGE INTO ships WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT id, name, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY id DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 (id, name) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.id = source.id) WHEN MATCHED THEN UPDATE SET target.name = source.name WHEN NOT MATCHED BY TARGET THEN INSERT (id, name) VALUES (source.id, source.name) OUTPUT INSERTED.id"
226+
@identity_merge_sql_unordered = "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [name], [id], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([name], [id]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name] WHEN NOT MATCHED BY TARGET THEN INSERT ([name], [id]) VALUES (source.[name], source.[id]) OUTPUT INSERTED.[id]"
227+
222228
@identity_insert_sql_non_dbo = "INSERT INTO [test].[aliens] ([id],[name]) VALUES(420,'Mork')"
223229
@identity_insert_sql_non_dbo_unquoted = "INSERT INTO test.aliens ([id],[name]) VALUES(420,'Mork')"
224230
@identity_insert_sql_non_dbo_unordered = "INSERT INTO [test].[aliens] ([name],[id]) VALUES('Mork',420)"
@@ -235,6 +241,10 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
235241
assert_equal "[funny_jokes]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_unquoted_sp)
236242
assert_equal "[funny_jokes]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_unordered_sp)
237243

244+
assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql)
245+
assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql_unquoted)
246+
assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql_unordered)
247+
238248
assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo)
239249
assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo_unquoted)
240250
assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo_unordered)
@@ -244,7 +254,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
244254
end
245255

246256
it "return false to #query_requires_identity_insert? for normal SQL" do
247-
[basic_insert_sql, basic_update_sql, basic_select_sql].each do |sql|
257+
[basic_insert_sql, basic_merge_sql, basic_update_sql, basic_select_sql].each do |sql|
248258
assert !connection.send(:query_requires_identity_insert?, sql), "SQL was #{sql}"
249259
end
250260
end

test/cases/coerced_tests.rb

+18
Original file line numberDiff line numberDiff line change
@@ -2552,6 +2552,24 @@ def test_insert_with_type_casting_and_serialize_is_consistent_coerced
25522552
Book.where(author_id: nil, name: '["Array"]').delete_all
25532553
Book.lease_connection.add_index(:books, [:author_id, :name], unique: true)
25542554
end
2555+
2556+
# Same as original but using target.status for assignment and CASE instead of GREATEST for operator
2557+
coerce_tests! :test_upsert_all_updates_using_provided_sql
2558+
def test_upsert_all_updates_using_provided_sql_coerced
2559+
Book.upsert_all(
2560+
[{id: 1, status: 1}, {id: 2, status: 1}],
2561+
on_duplicate: Arel.sql(<<~SQL
2562+
target.status = CASE
2563+
WHEN target.status > 1 THEN target.status
2564+
ELSE 1
2565+
END
2566+
SQL
2567+
)
2568+
)
2569+
2570+
assert_equal "published", Book.find(1).status
2571+
assert_equal "written", Book.find(2).status
2572+
end
25552573
end
25562574

25572575
module ActiveRecord

test/cases/schema_test_sqlserver.rb

+18
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,24 @@ class SchemaTestSQLServer < ActiveRecord::TestCase
102102
end
103103
end
104104

105+
describe "MERGE statements" do
106+
it do
107+
assert_equal "[dashboards]", connection.send(:get_raw_table_name, "MERGE INTO [dashboards] AS target")
108+
end
109+
110+
it do
111+
assert_equal "lock_without_defaults", connection.send(:get_raw_table_name, "MERGE INTO lock_without_defaults AS target")
112+
end
113+
114+
it do
115+
assert_equal "[WITH - SPACES]", connection.send(:get_raw_table_name, "MERGE INTO [WITH - SPACES] AS target")
116+
end
117+
118+
it do
119+
assert_equal "[with].[select notation]", connection.send(:get_raw_table_name, "MERGE INTO [with].[select notation] AS target")
120+
end
121+
end
122+
105123
describe "CREATE VIEW statements" do
106124
it do
107125
assert_equal "test_table_as", connection.send(:get_raw_table_name, "CREATE VIEW test_views ( test_table_a_id, test_table_b_id ) AS SELECT test_table_as.id as test_table_a_id, test_table_bs.id as test_table_b_id FROM (test_table_as with(nolock) LEFT JOIN test_table_bs with(nolock) ON (test_table_as.id = test_table_bs.test_table_a_id))")

0 commit comments

Comments
 (0)