Skip to content

Commit 69bccb9

Browse files
opensearch-trigger-bot[bot]github-actions[bot]andrross
authored
[Backport 2.16] Change default batch size of bulk API to Integer.MAX_VALUE (opensearch-project#14882)
* Deprecate batch_size parameter on bulk API (opensearch-project#14725) By default the full _bulk payload will be passed to ingest processors as a batch, with any sub batching logic to be implemented by each processor if necessary. Signed-off-by: Liyun Xiu <[email protected]> (cherry picked from commit 97f26cc) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> * Add missing ArrayList import Signed-off-by: Andrew Ross <[email protected]> --------- Signed-off-by: Liyun Xiu <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Signed-off-by: Andrew Ross <[email protected]> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Andrew Ross <[email protected]>
1 parent 1d634b4 commit 69bccb9

File tree

7 files changed

+142
-101
lines changed

7 files changed

+142
-101
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5656
- Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635))
5757

5858
### Deprecated
59+
- Deprecate batch_size parameter on bulk API ([#14725](https://github.com/opensearch-project/OpenSearch/pull/14725))
5960

6061
### Removed
6162
- Remove query categorization changes ([#14759](https://github.com/opensearch-project/OpenSearch/pull/14759))

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml

+1-32
Original file line numberDiff line numberDiff line change
@@ -207,15 +207,14 @@ teardown:
207207
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1", "field2": "value2"}}
208208

209209
---
210-
"Test bulk API with batch enabled happy case":
210+
"Test bulk API with default batch size":
211211
- skip:
212212
version: " - 2.13.99"
213213
reason: "Added in 2.14.0"
214214

215215
- do:
216216
bulk:
217217
refresh: true
218-
batch_size: 2
219218
pipeline: "pipeline1"
220219
body:
221220
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
@@ -245,36 +244,6 @@ teardown:
245244
id: test_id3
246245
- match: { _source: { "text": "text3", "field1": "value1" } }
247246

248-
---
249-
"Test bulk API with batch_size missing":
250-
- skip:
251-
version: " - 2.13.99"
252-
reason: "Added in 2.14.0"
253-
254-
- do:
255-
bulk:
256-
refresh: true
257-
pipeline: "pipeline1"
258-
body:
259-
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
260-
- '{"text": "text1"}'
261-
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
262-
- '{"text": "text2"}'
263-
264-
- match: { errors: false }
265-
266-
- do:
267-
get:
268-
index: test_index
269-
id: test_id1
270-
- match: { _source: { "text": "text1", "field1": "value1" } }
271-
272-
- do:
273-
get:
274-
index: test_index
275-
id: test_id2
276-
- match: { _source: { "text": "text2", "field1": "value1" } }
277-
278247
---
279248
"Test bulk API with invalid batch_size":
280249
- skip:

server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java

+81
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,87 @@ public void testBulkWithUpsert() throws Exception {
357357
assertThat(upserted.get("processed"), equalTo(true));
358358
}
359359

360+
public void testSingleDocIngestFailure() throws Exception {
361+
createIndex("test");
362+
BytesReference source = BytesReference.bytes(
363+
jsonBuilder().startObject()
364+
.field("description", "my_pipeline")
365+
.startArray("processors")
366+
.startObject()
367+
.startObject("test")
368+
.endObject()
369+
.endObject()
370+
.endArray()
371+
.endObject()
372+
);
373+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
374+
client().admin().cluster().putPipeline(putPipelineRequest).get();
375+
376+
GetPipelineRequest getPipelineRequest = new GetPipelineRequest("_id");
377+
GetPipelineResponse getResponse = client().admin().cluster().getPipeline(getPipelineRequest).get();
378+
assertThat(getResponse.isFound(), is(true));
379+
assertThat(getResponse.pipelines().size(), equalTo(1));
380+
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
381+
382+
assertThrows(
383+
IllegalArgumentException.class,
384+
() -> client().prepareIndex("test")
385+
.setId("1")
386+
.setPipeline("_id")
387+
.setSource(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
388+
.get()
389+
);
390+
391+
DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id");
392+
AcknowledgedResponse response = client().admin().cluster().deletePipeline(deletePipelineRequest).get();
393+
assertThat(response.isAcknowledged(), is(true));
394+
395+
getResponse = client().admin().cluster().prepareGetPipeline("_id").get();
396+
assertThat(getResponse.isFound(), is(false));
397+
assertThat(getResponse.pipelines().size(), equalTo(0));
398+
}
399+
400+
public void testSingleDocIngestDrop() throws Exception {
401+
createIndex("test");
402+
BytesReference source = BytesReference.bytes(
403+
jsonBuilder().startObject()
404+
.field("description", "my_pipeline")
405+
.startArray("processors")
406+
.startObject()
407+
.startObject("test")
408+
.endObject()
409+
.endObject()
410+
.endArray()
411+
.endObject()
412+
);
413+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
414+
client().admin().cluster().putPipeline(putPipelineRequest).get();
415+
416+
GetPipelineRequest getPipelineRequest = new GetPipelineRequest("_id");
417+
GetPipelineResponse getResponse = client().admin().cluster().getPipeline(getPipelineRequest).get();
418+
assertThat(getResponse.isFound(), is(true));
419+
assertThat(getResponse.pipelines().size(), equalTo(1));
420+
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
421+
422+
DocWriteResponse indexResponse = client().prepareIndex("test")
423+
.setId("1")
424+
.setPipeline("_id")
425+
.setSource(Requests.INDEX_CONTENT_TYPE, "field", "value", "drop", true)
426+
.get();
427+
assertEquals(DocWriteResponse.Result.NOOP, indexResponse.getResult());
428+
429+
Map<String, Object> doc = client().prepareGet("test", "1").get().getSourceAsMap();
430+
assertNull(doc);
431+
432+
DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id");
433+
AcknowledgedResponse response = client().admin().cluster().deletePipeline(deletePipelineRequest).get();
434+
assertThat(response.isAcknowledged(), is(true));
435+
436+
getResponse = client().admin().cluster().prepareGetPipeline("_id").get();
437+
assertThat(getResponse.isFound(), is(false));
438+
assertThat(getResponse.pipelines().size(), equalTo(0));
439+
}
440+
360441
public void test() throws Exception {
361442
BytesReference source = BytesReference.bytes(
362443
jsonBuilder().startObject()

server/src/main/java/org/opensearch/action/bulk/BulkRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
9696
private String globalRouting;
9797
private String globalIndex;
9898
private Boolean globalRequireAlias;
99-
private int batchSize = 1;
99+
private int batchSize = Integer.MAX_VALUE;
100100

101101
private long sizeInBytes = 0;
102102

server/src/main/java/org/opensearch/ingest/IngestService.java

+3-61
Original file line numberDiff line numberDiff line change
@@ -525,61 +525,7 @@ public void onFailure(Exception e) {
525525

526526
@Override
527527
protected void doRun() {
528-
int batchSize = originalBulkRequest.batchSize();
529-
if (shouldExecuteBulkRequestInBatch(originalBulkRequest.requests().size(), batchSize)) {
530-
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
531-
return;
532-
}
533-
534-
final Thread originalThread = Thread.currentThread();
535-
final AtomicInteger counter = new AtomicInteger(numberOfActionRequests);
536-
int i = 0;
537-
for (DocWriteRequest<?> actionRequest : actionRequests) {
538-
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
539-
if (indexRequest == null) {
540-
if (counter.decrementAndGet() == 0) {
541-
onCompletion.accept(originalThread, null);
542-
}
543-
assert counter.get() >= 0;
544-
i++;
545-
continue;
546-
}
547-
final String pipelineId = indexRequest.getPipeline();
548-
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
549-
final String finalPipelineId = indexRequest.getFinalPipeline();
550-
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
551-
boolean hasFinalPipeline = true;
552-
final List<String> pipelines;
553-
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false
554-
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
555-
pipelines = Arrays.asList(pipelineId, finalPipelineId);
556-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
557-
pipelines = Collections.singletonList(pipelineId);
558-
hasFinalPipeline = false;
559-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
560-
pipelines = Collections.singletonList(finalPipelineId);
561-
} else {
562-
if (counter.decrementAndGet() == 0) {
563-
onCompletion.accept(originalThread, null);
564-
}
565-
assert counter.get() >= 0;
566-
i++;
567-
continue;
568-
}
569-
570-
executePipelines(
571-
i,
572-
pipelines.iterator(),
573-
hasFinalPipeline,
574-
indexRequest,
575-
onDropped,
576-
onFailure,
577-
counter,
578-
onCompletion,
579-
originalThread
580-
);
581-
i++;
582-
}
528+
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
583529
}
584530
});
585531
}
@@ -635,7 +581,7 @@ private void runBulkRequestInBatch(
635581
i++;
636582
}
637583

638-
int batchSize = originalBulkRequest.batchSize();
584+
int batchSize = Math.min(numberOfActionRequests, originalBulkRequest.batchSize());
639585
List<List<IndexRequestWrapper>> batches = prepareBatches(batchSize, indexRequestWrappers);
640586
logger.debug("batchSize: {}, batches: {}", batchSize, batches.size());
641587

@@ -654,10 +600,6 @@ private void runBulkRequestInBatch(
654600
}
655601
}
656602

657-
private boolean shouldExecuteBulkRequestInBatch(int documentSize, int batchSize) {
658-
return documentSize > 1 && batchSize > 1;
659-
}
660-
661603
/**
662604
* IndexRequests are grouped by unique (index + pipeline_ids) before batching.
663605
* Only IndexRequests in the same group could be batched. It's to ensure batched documents always
@@ -685,7 +627,7 @@ static List<List<IndexRequestWrapper>> prepareBatches(int batchSize, List<IndexR
685627
}
686628
List<List<IndexRequestWrapper>> batchedIndexRequests = new ArrayList<>();
687629
for (Map.Entry<Integer, List<IndexRequestWrapper>> indexRequestsPerKey : indexRequestsPerIndexAndPipelines.entrySet()) {
688-
for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += batchSize) {
630+
for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += Math.min(indexRequestsPerKey.getValue().size(), batchSize)) {
689631
batchedIndexRequests.add(
690632
new ArrayList<>(
691633
indexRequestsPerKey.getValue().subList(i, i + Math.min(batchSize, indexRequestsPerKey.getValue().size() - i))

server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.action.support.ActiveShardCount;
3939
import org.opensearch.client.Requests;
4040
import org.opensearch.client.node.NodeClient;
41+
import org.opensearch.common.logging.DeprecationLogger;
4142
import org.opensearch.common.settings.Settings;
4243
import org.opensearch.rest.BaseRestHandler;
4344
import org.opensearch.rest.RestRequest;
@@ -66,6 +67,8 @@
6667
public class RestBulkAction extends BaseRestHandler {
6768

6869
private final boolean allowExplicitIndex;
70+
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestBulkAction.class);
71+
static final String BATCH_SIZE_DEPRECATED_MESSAGE = "The batch size option in bulk API is deprecated and will be removed in 3.0.";
6972

7073
public RestBulkAction(Settings settings) {
7174
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
@@ -97,7 +100,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
97100
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
98101
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
99102
bulkRequest.setRefreshPolicy(request.param("refresh"));
100-
bulkRequest.batchSize(request.paramAsInt("batch_size", 1));
103+
if (request.hasParam("batch_size")) {
104+
deprecationLogger.deprecate("batch_size_deprecation", BATCH_SIZE_DEPRECATED_MESSAGE);
105+
}
106+
bulkRequest.batchSize(request.paramAsInt("batch_size", Integer.MAX_VALUE));
101107
bulkRequest.add(
102108
request.requiredContent(),
103109
defaultIndex,

server/src/test/java/org/opensearch/ingest/IngestServiceTests.java

+48-6
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.junit.Before;
8282

8383
import java.nio.charset.StandardCharsets;
84+
import java.util.ArrayList;
8485
import java.util.Arrays;
8586
import java.util.Collections;
8687
import java.util.Comparator;
@@ -1133,10 +1134,14 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
11331134
Exception error = new RuntimeException();
11341135
doAnswer(args -> {
11351136
@SuppressWarnings("unchecked")
1136-
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
1137-
handler.accept(null, error);
1137+
List<IngestDocumentWrapper> ingestDocumentWrappers = (List) args.getArguments()[0];
1138+
Consumer<List<IngestDocumentWrapper>> handler = (Consumer) args.getArguments()[1];
1139+
for (IngestDocumentWrapper wrapper : ingestDocumentWrappers) {
1140+
wrapper.update(wrapper.getIngestDocument(), error);
1141+
}
1142+
handler.accept(ingestDocumentWrappers);
11381143
return null;
1139-
}).when(processor).execute(any(), any());
1144+
}).when(processor).batchExecute(any(), any());
11401145
IngestService ingestService = createWithProcessors(
11411146
Collections.singletonMap("mock", (factories, tag, description, config) -> processor)
11421147
);
@@ -1191,10 +1196,11 @@ public void testBulkRequestExecution() throws Exception {
11911196
when(processor.getTag()).thenReturn("mockTag");
11921197
doAnswer(args -> {
11931198
@SuppressWarnings("unchecked")
1194-
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
1195-
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
1199+
List<IngestDocumentWrapper> ingestDocumentWrappers = (List) args.getArguments()[0];
1200+
Consumer<List<IngestDocumentWrapper>> handler = (Consumer) args.getArguments()[1];
1201+
handler.accept(ingestDocumentWrappers);
11961202
return null;
1197-
}).when(processor).execute(any(), any());
1203+
}).when(processor).batchExecute(any(), any());
11981204
Map<String, Processor.Factory> map = new HashMap<>(2);
11991205
map.put("mock", (factories, tag, description, config) -> processor);
12001206

@@ -1962,6 +1968,42 @@ public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() {
19621968
verify(mockCompoundProcessor, never()).execute(any(), any());
19631969
}
19641970

1971+
public void testExecuteBulkRequestInBatchWithDefaultBatchSize() {
1972+
CompoundProcessor mockCompoundProcessor = mockCompoundProcessor();
1973+
IngestService ingestService = createWithProcessors(
1974+
Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor)
1975+
);
1976+
createPipeline("_id", ingestService);
1977+
BulkRequest bulkRequest = new BulkRequest();
1978+
IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
1979+
bulkRequest.add(indexRequest1);
1980+
IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
1981+
bulkRequest.add(indexRequest2);
1982+
IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id");
1983+
bulkRequest.add(indexRequest3);
1984+
IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
1985+
bulkRequest.add(indexRequest4);
1986+
@SuppressWarnings("unchecked")
1987+
final Map<Integer, Exception> failureHandler = new HashMap<>();
1988+
final Map<Thread, Exception> completionHandler = new HashMap<>();
1989+
final List<Integer> dropHandler = new ArrayList<>();
1990+
ingestService.executeBulkRequest(
1991+
4,
1992+
bulkRequest.requests(),
1993+
failureHandler::put,
1994+
completionHandler::put,
1995+
dropHandler::add,
1996+
Names.WRITE,
1997+
bulkRequest
1998+
);
1999+
assertTrue(failureHandler.isEmpty());
2000+
assertTrue(dropHandler.isEmpty());
2001+
assertEquals(1, completionHandler.size());
2002+
assertNull(completionHandler.get(Thread.currentThread()));
2003+
verify(mockCompoundProcessor, times(1)).batchExecute(any(), any());
2004+
verify(mockCompoundProcessor, never()).execute(any(), any());
2005+
}
2006+
19652007
public void testPrepareBatches_same_index_pipeline() {
19662008
IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));
19672009
IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));

0 commit comments

Comments
 (0)