|
| 1 | +import os |
| 2 | +import random |
| 3 | +import tarfile |
| 4 | +from collections import defaultdict |
| 5 | +from unittest.mock import patch |
| 6 | + |
| 7 | +from parameterized import parameterized |
| 8 | +from torchtext.datasets.stsb import STSB |
| 9 | + |
| 10 | +from ..common.case_utils import TempDirMixin, zip_equal, get_random_unicode |
| 11 | +from ..common.torchtext_test_case import TorchtextTestCase |
| 12 | + |
| 13 | + |
| 14 | +def _get_mock_dataset(root_dir): |
| 15 | + """ |
| 16 | + root_dir: directory to the mocked dataset |
| 17 | + """ |
| 18 | + base_dir = os.path.join(root_dir, "STSB") |
| 19 | + temp_dataset_dir = os.path.join(base_dir, "stsbenchmark") |
| 20 | + os.makedirs(temp_dataset_dir, exist_ok=True) |
| 21 | + |
| 22 | + seed = 1 |
| 23 | + mocked_data = defaultdict(list) |
| 24 | + for file_name, name in zip(["sts-train.csv", "sts-dev.csv" "sts-test.csv"], ["train", "dev", "test"]): |
| 25 | + txt_file = os.path.join(temp_dataset_dir, file_name) |
| 26 | + with open(txt_file, "w", encoding="utf-8") as f: |
| 27 | + for i in range(5): |
| 28 | + label = random.uniform(0, 5) |
| 29 | + rand_string_1 = get_random_unicode(seed) |
| 30 | + rand_string_2 = get_random_unicode(seed + 1) |
| 31 | + rand_string_3 = get_random_unicode(seed + 2) |
| 32 | + rand_string_4 = get_random_unicode(seed + 3) |
| 33 | + rand_string_5 = get_random_unicode(seed + 4) |
| 34 | + dataset_line = (i, label, rand_string_4, rand_string_5) |
| 35 | + # append line to correct dataset split |
| 36 | + mocked_data[name].append(dataset_line) |
| 37 | + f.write( |
| 38 | + f"{rand_string_1}\t{rand_string_2}\t{rand_string_3}\t{i}\t{label}\t{rand_string_4}\t{rand_string_5}\n" |
| 39 | + ) |
| 40 | + seed += 1 |
| 41 | + # case with quotes to test arg `quoting=csv.QUOTE_NONE` |
| 42 | + dataset_line = (i, label, rand_string_4, rand_string_5) |
| 43 | + # append line to correct dataset split |
| 44 | + mocked_data[name].append(dataset_line) |
| 45 | + f.write( |
| 46 | + f'{rand_string_1}"\t"{rand_string_2}\t{rand_string_3}\t{i}\t{label}\t{rand_string_4}\t{rand_string_5}\n' |
| 47 | + ) |
| 48 | + |
| 49 | + compressed_dataset_path = os.path.join(base_dir, "Stsbenchmark.tar.gz") |
| 50 | + # create tar file from dataset folder |
| 51 | + with tarfile.open(compressed_dataset_path, "w:gz") as tar: |
| 52 | + tar.add(temp_dataset_dir, arcname="stsbenchmark") |
| 53 | + |
| 54 | + return mocked_data |
| 55 | + |
| 56 | + |
| 57 | +class TestSTSB(TempDirMixin, TorchtextTestCase): |
| 58 | + root_dir = None |
| 59 | + samples = [] |
| 60 | + |
| 61 | + @classmethod |
| 62 | + def setUpClass(cls): |
| 63 | + super().setUpClass() |
| 64 | + cls.root_dir = cls.get_base_temp_dir() |
| 65 | + cls.samples = _get_mock_dataset(cls.root_dir) |
| 66 | + cls.patcher = patch("torchdata.datapipes.iter.util.cacheholder._hash_check", return_value=True) |
| 67 | + cls.patcher.start() |
| 68 | + |
| 69 | + @classmethod |
| 70 | + def tearDownClass(cls): |
| 71 | + cls.patcher.stop() |
| 72 | + super().tearDownClass() |
| 73 | + |
| 74 | + @parameterized.expand(["train", "dev", "test"]) |
| 75 | + def test_stsb(self, split): |
| 76 | + dataset = STSB(root=self.root_dir, split=split) |
| 77 | + |
| 78 | + samples = list(dataset) |
| 79 | + expected_samples = self.samples[split] |
| 80 | + for sample, expected_sample in zip_equal(samples, expected_samples): |
| 81 | + self.assertEqual(sample, expected_sample) |
| 82 | + |
| 83 | + @parameterized.expand(["train", "dev", "test"]) |
| 84 | + def test_stsb_split_argument(self, split): |
| 85 | + dataset1 = STSB(root=self.root_dir, split=split) |
| 86 | + (dataset2,) = STSB(root=self.root_dir, split=(split,)) |
| 87 | + |
| 88 | + for d1, d2 in zip_equal(dataset1, dataset2): |
| 89 | + self.assertEqual(d1, d2) |
0 commit comments