|
7 | 7 | """
|
8 | 8 |
|
9 | 9 | import threading
|
| 10 | +import unittest |
10 | 11 |
|
11 | 12 | import cx_Oracle as oracledb
|
12 | 13 | import test_env
|
13 | 14 |
|
14 |
| -class SubscriptionData(object): |
| 15 | +class SubscriptionData: |
15 | 16 |
|
16 | 17 | def __init__(self, num_messages_expected):
|
17 | 18 | self.condition = threading.Condition()
|
18 | 19 | self.num_messages_expected = num_messages_expected
|
19 | 20 | self.num_messages_received = 0
|
20 |
| - self.table_operations = [] |
21 |
| - self.row_operations = [] |
22 |
| - self.rowids = [] |
23 | 21 |
|
24 |
| - def CallbackHandler(self, message): |
| 22 | + def _process_message(self, message): |
| 23 | + pass |
| 24 | + |
| 25 | + def callback_handler(self, message): |
25 | 26 | if message.type != oracledb.EVENT_DEREG:
|
26 |
| - table, = message.tables |
27 |
| - self.table_operations.append(table.operation) |
28 |
| - for row in table.rows: |
29 |
| - self.row_operations.append(row.operation) |
30 |
| - self.rowids.append(row.rowid) |
| 27 | + self._process_message(message) |
31 | 28 | self.num_messages_received += 1
|
32 | 29 | if message.type == oracledb.EVENT_DEREG or \
|
33 | 30 | self.num_messages_received == self.num_messages_expected:
|
34 |
| - self.condition.acquire() |
35 |
| - self.condition.notify() |
36 |
| - self.condition.release() |
| 31 | + with self.condition: |
| 32 | + self.condition.notify() |
| 33 | + |
| 34 | + def wait_for_messages(self): |
| 35 | + if self.num_messages_received < self.num_messages_expected: |
| 36 | + with self.condition: |
| 37 | + self.condition.wait(10) |
| 38 | + |
| 39 | + |
| 40 | +class AQSubscriptionData(SubscriptionData): |
| 41 | + pass |
| 42 | + |
| 43 | + |
| 44 | +class DMLSubscriptionData(SubscriptionData): |
| 45 | + |
| 46 | + def __init__(self, num_messages_expected): |
| 47 | + super().__init__(num_messages_expected) |
| 48 | + self.table_operations = [] |
| 49 | + self.row_operations = [] |
| 50 | + self.rowids = [] |
| 51 | + |
| 52 | + def _process_message(self, message): |
| 53 | + table, = message.tables |
| 54 | + self.table_operations.append(table.operation) |
| 55 | + for row in table.rows: |
| 56 | + self.row_operations.append(row.operation) |
| 57 | + self.rowids.append(row.rowid) |
37 | 58 |
|
38 | 59 |
|
39 | 60 | class TestCase(test_env.BaseTestCase):
|
40 | 61 |
|
41 |
| - def test_3000_subscription(self): |
42 |
| - "3000 - test Subscription for insert, update, delete and truncate" |
| 62 | + def test_3000_dml_subscription(self): |
| 63 | + "3000 - test subscription for insert, update, delete and truncate" |
43 | 64 |
|
44 | 65 | # skip if running on the Oracle Cloud, which does not support
|
45 | 66 | # subscriptions currently
|
@@ -67,9 +88,9 @@ def test_3000_subscription(self):
|
67 | 88 | rowids = []
|
68 | 89 |
|
69 | 90 | # set up subscription
|
70 |
| - data = SubscriptionData(5) |
| 91 | + data = DMLSubscriptionData(5) |
71 | 92 | connection = test_env.get_connection(threaded=True, events=True)
|
72 |
| - sub = connection.subscribe(callback=data.CallbackHandler, |
| 93 | + sub = connection.subscribe(callback=data.callback_handler, |
73 | 94 | timeout=10, qos=oracledb.SUBSCR_QOS_ROWIDS)
|
74 | 95 | sub.registerquery("select * from TestTempTable")
|
75 | 96 | connection.autocommit = True
|
@@ -105,8 +126,7 @@ def test_3000_subscription(self):
|
105 | 126 | cursor.execute("truncate table TestTempTable")
|
106 | 127 |
|
107 | 128 | # wait for all messages to be sent
|
108 |
| - data.condition.acquire() |
109 |
| - data.condition.wait(10) |
| 129 | + data.wait_for_messages() |
110 | 130 |
|
111 | 131 | # verify the correct messages were sent
|
112 | 132 | self.assertEqual(data.table_operations, table_operations)
|
@@ -134,5 +154,30 @@ def test_3001_deprecations(self):
|
134 | 154 | self.assertRaises(oracledb.ProgrammingError, connection.subscribe,
|
135 | 155 | client_initiated=True, clientInitiated=True)
|
136 | 156 |
|
| 157 | + @unittest.skip("multiple subscriptions cannot be created simultaneously") |
| 158 | + def test_3002_aq_subscription(self): |
| 159 | + "3002 - test subscription for AQ" |
| 160 | + |
| 161 | + # create queue and clear it of all messages |
| 162 | + queue = self.connection.queue("TEST_RAW_QUEUE") |
| 163 | + queue.deqoptions.wait = oracledb.DEQ_NO_WAIT |
| 164 | + while queue.deqone(): |
| 165 | + pass |
| 166 | + self.connection.commit() |
| 167 | + |
| 168 | + # set up subscription |
| 169 | + data = AQSubscriptionData(1) |
| 170 | + connection = test_env.get_connection(events=True) |
| 171 | + sub = connection.subscribe(namespace=oracledb.SUBSCR_NAMESPACE_AQ, |
| 172 | + name=queue.name, timeout=10, |
| 173 | + callback=data.callback_handler) |
| 174 | + |
| 175 | + # enqueue a message |
| 176 | + queue.enqone(self.connection.msgproperties(payload="Some data")) |
| 177 | + self.connection.commit() |
| 178 | + |
| 179 | + # wait for all messages to be sent |
| 180 | + data.wait_for_messages() |
| 181 | + |
137 | 182 | if __name__ == "__main__":
|
138 | 183 | test_env.run_test_cases()
|
0 commit comments