-
Notifications
You must be signed in to change notification settings - Fork 273
/
Copy pathdocument_entries_00.rs
193 lines (166 loc) · 6.36 KB
/
document_entries_00.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
use azure_core::prelude::*;
use azure_cosmos::prelude::*;
use azure_cosmos::responses::GetDocumentResponse;
use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error;
use std::sync::Arc;
// Now we create a sample struct. The Cow trick
// allows us to use the same struct for serializing
// (without having to own the items if not needed) and
// for deserializing (where owning is required).
// We do not need to define the "id" field here, it will be
// specified in the Document struct below.
#[derive(Serialize, Deserialize, Clone, Debug)]
struct MySampleStruct<'a> {
id: Cow<'a, str>,
a_string: Cow<'a, str>,
a_number: u64,
a_timestamp: i64,
}
// This example expects you to have created a collection
// with partitionKey on "id".
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let database_name = std::env::args()
.nth(1)
.expect("please specify database name as first command line parameter");
let collection_name = std::env::args()
.nth(2)
.expect("please specify collection name as second command line parameter");
let master_key =
std::env::var("COSMOS_MASTER_KEY").expect("Set env variable COSMOS_MASTER_KEY first!");
let account = std::env::var("COSMOS_ACCOUNT").expect("Set env variable COSMOS_ACCOUNT first!");
let authorization_token = permission::AuthorizationToken::primary_from_base64(&master_key)?;
let http_client: Arc<Box<dyn HttpClient>> = Arc::new(Box::new(reqwest::Client::new()));
let client = CosmosClient::new(http_client, account, authorization_token);
let client = client.into_database_client(database_name);
let client = client.into_collection_client(collection_name);
let mut response = None;
for i in 0u64..5 {
let doc = Document::new(MySampleStruct {
id: Cow::Owned(format!("unique_id{}", i)),
a_string: Cow::Borrowed("Something here"),
a_number: i,
a_timestamp: chrono::Utc::now().timestamp(),
});
// let's add an entity.
response = Some(
client
.create_document()
.partition_keys([&doc.document.id])
.execute(&doc)
.await?,
);
}
println!("Created 5 documents.");
// Let's get 3 entries at a time.
let response = client
.list_documents()
.consistency_level(response.unwrap())
.max_item_count(3i32)
.execute::<MySampleStruct>()
.await?;
assert_eq!(response.documents.len(), 3);
println!("response == {:#?}", response);
// we inserted 5 documents and retrieved the first 3.
// continuation_token must be present
assert!(response.continuation_token.is_some());
let session_token = &response;
let ct = response.continuation_token.clone().unwrap();
println!("ct == {}", ct);
let response = client
.list_documents()
.consistency_level(session_token)
.continuation(ct.as_str())
.execute::<MySampleStruct>()
.await?;
assert_eq!(response.documents.len(), 2);
println!("response == {:#?}", response);
// we got the last 2 entries. Now continuation_token
// must be absent
assert!(response.continuation_token.is_none());
// we can have Rust pass the continuation_token for
// us if we call the stream function. Here we
// ask for 3 items at the time but of course you don't have to do that, the
// stream function will work regardless of the limits imposed.
let session_token: ConsistencyLevel = (&response).into();
{
println!("\nStreaming documents");
let stream = client
.list_documents()
.consistency_level(session_token.clone())
.max_item_count(3);
let mut stream = Box::pin(stream.stream::<MySampleStruct>());
// TODO: As soon as the streaming functionality is completed
// in Rust substitute this while let Some... into
// for each (or whatever the Rust team picks).
while let Some(res) = stream.next().await {
let res = res?;
println!("Received {} documents in one batch!", res.documents.len());
}
}
println!("\n\nLooking for a specific item");
let id = format!("unique_id{}", 3);
let partition_keys = PartitionKeys::from([&id]);
let response = client
.clone()
.into_document_client(id.clone(), partition_keys.clone())
.get_document()
.consistency_level(session_token)
.execute::<MySampleStruct>()
.await?;
assert!(match response {
GetDocumentResponse::Found(_) => true,
_ => false,
});
println!("response == {:#?}", response);
let mut doc = match response {
GetDocumentResponse::Found(ref resp) => resp.clone(),
GetDocumentResponse::NotFound(_) => panic!(),
};
doc.document.document.a_string = "Something else here".into();
println!("\n\nReplacing document");
let replace_document_response = client
.replace_document(&id)
.partition_keys(partition_keys)
.consistency_level(ConsistencyLevel::from(&response))
.if_match_condition(IfMatchCondition::Match(&doc.etag)) // use optimistic concurrency check
.execute(&doc.document)
.await?;
println!(
"replace_document_response == {:#?}",
replace_document_response
);
// This id should not be found. We expect None as result and
// has_been_found == false
println!("\n\nLooking for non-existing item");
let id = format!("unique_id{}", 100);
let partition_keys = PartitionKeys::from([&id]);
let response = client
.clone()
.into_document_client(id.clone(), partition_keys)
.get_document()
.consistency_level(&response)
.execute::<MySampleStruct>()
.await?;
assert!(match response {
GetDocumentResponse::NotFound(_) => true,
_ => false,
});
println!("response == {:#?}", response);
for i in 0u64..5 {
let id = format!("unique_id{}", i);
let partition_keys = PartitionKeys::from([&id]);
client
.clone()
.into_document_client(id, partition_keys)
.delete_document()
.consistency_level(&response)
.execute()
.await?;
}
println!("Cleaned up");
Ok(())
}