Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small refactor of Support insert_all and upsert_all using MERGE #1314

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,50 +154,54 @@ def default_insert_value(column)
private :default_insert_value

def build_insert_sql(insert) # :nodoc:
if insert.skip_duplicates? || insert.update_duplicates?
insert_all = insert.send(:insert_all)
columns_with_uniqueness_constraints = get_columns_with_uniqueness_constraints(insert_all:, insert:)

# if we do not have any columns that might have conflicting values, just execute a regular insert
return build_sql_for_regular_insert(insert) if columns_with_uniqueness_constraints.flatten.empty?

sql = <<~SQL
MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target
USING (
SELECT *
FROM (
SELECT #{insert.send(:columns_list)}, #{partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)}
FROM (#{insert.values_list})
AS t1 (#{insert.send(:columns_list)})
) AS ranked_source
WHERE #{is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)}
) AS source
ON (#{joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)})
SQL

if insert.update_duplicates?
sql << " WHEN MATCHED THEN UPDATE SET "

if insert.raw_update_sql?
sql << insert.raw_update_sql
else
if insert.record_timestamps?
sql << build_sql_for_recording_timestamps_when_updating(insert:)
end
# Use regular insert if not skipping/updating duplicates.
return build_sql_for_regular_insert(insert:) unless insert.skip_duplicates? || insert.update_duplicates?

sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",")
end
end
sql << " WHEN NOT MATCHED BY TARGET THEN"
sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})"
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
insert_all = insert.send(:insert_all)
columns_with_uniqueness_constraints = get_columns_with_uniqueness_constraints(insert_all:, insert:)

# If we do not have any columns that might have conflicting values just execute a regular insert, else use merge.
if columns_with_uniqueness_constraints.flatten.empty?
build_sql_for_regular_insert(insert:)
else
build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:)
end
end

sql << ";"
def build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:) # :nodoc:
sql = <<~SQL
MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target
USING (
SELECT *
FROM (
SELECT #{insert.send(:columns_list)}, #{partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)}
FROM (#{insert.values_list})
AS t1 (#{insert.send(:columns_list)})
) AS ranked_source
WHERE #{is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)}
) AS source
ON (#{joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)})
SQL

if insert.update_duplicates?
sql << " WHEN MATCHED THEN UPDATE SET "

if insert.raw_update_sql?
sql << insert.raw_update_sql
else
if insert.record_timestamps?
sql << build_sql_for_recording_timestamps_when_updating(insert:)
end

return sql
sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",")
end
end
sql << " WHEN NOT MATCHED BY TARGET THEN"
sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})"
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
sql << ";"

build_sql_for_regular_insert(insert)
sql
end

# === SQLServer Specific ======================================== #
Expand Down Expand Up @@ -536,11 +540,9 @@ def get_columns_with_uniqueness_constraints(insert_all:, insert:)
end
private :get_columns_with_uniqueness_constraints

def build_sql_for_regular_insert(insert)
def build_sql_for_regular_insert(insert:)
sql = "INSERT #{insert.into}"

sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))

sql << " #{insert.values_list}"
sql
end
Expand All @@ -553,7 +555,7 @@ def build_sql_for_regular_insert(insert)
# but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves
# otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables
# this works easiest by using PARTITION and make sure that any record
# we are trying to insert is "the first one seen across all the potential columns with uniquness constraints"
# we are trying to insert is "the first one seen across all the potential columns with uniqueness constraints"
def partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)
columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index|
<<~PARTITION_BY
Expand Down