1
+ import asyncio
2
+ from typing import TypedDict
3
+ import httpx
4
+ import os
5
+ import json
6
+ import websockets
7
+
8
+ from .price_feeds import Price
9
+
10
+ HERMES_ENDPOINT_HTTPS = "https://hermes.pyth.network/"
11
+ HERMES_ENDPOINT_WSS = "wss://hermes.pyth.network/ws"
12
+
13
+
14
+ class PriceFeed (TypedDict ):
15
+ feed_id : str
16
+ price : Price
17
+ ema_price : Price
18
+ update_data : list [str ]
19
+
20
+
21
+ def parse_unsupported_version (version ):
22
+ if isinstance (version , int ):
23
+ raise ValueError ("Version number {version} not supported" )
24
+ else :
25
+ raise TypeError ("Version must be an integer" )
26
+
27
+
28
+ class HermesClient :
29
+ def __init__ (self , feed_ids : list [str ], endpoint = HERMES_ENDPOINT_HTTPS , ws_endpoint = HERMES_ENDPOINT_WSS , feed_batch_size = 100 ):
30
+ self .feed_ids = feed_ids
31
+ self .pending_feed_ids = feed_ids
32
+ self .prices_dict : dict [str , PriceFeed ] = {}
33
+ self .endpoint = endpoint
34
+ self .ws_endpoint = ws_endpoint
35
+ self .feed_batch_size = feed_batch_size # max number of feed IDs to query at once in https requests
36
+
37
+ async def get_price_feed_ids (self ) -> list [str ]:
38
+ """
39
+ Queries the Hermes https endpoint for a list of the IDs of all Pyth price feeds.
40
+ """
41
+
42
+ url = os .path .join (self .endpoint , "api/price_feed_ids" )
43
+
44
+ async with httpx .AsyncClient () as client :
45
+ data = (await client .get (url )).json ()
46
+
47
+ return data
48
+
49
+ def add_feed_ids (self , feed_ids : list [str ]):
50
+ self .feed_ids += feed_ids
51
+ self .feed_ids = list (set (self .feed_ids ))
52
+ self .pending_feed_ids += feed_ids
53
+
54
+ @staticmethod
55
+ def extract_price_feed_v1 (data : dict ) -> PriceFeed :
56
+ """
57
+ Extracts PriceFeed object from the v1 JSON response (individual price feed) from Hermes.
58
+ """
59
+ price = Price .from_dict (data ["price" ])
60
+ ema_price = Price .from_dict (data ["ema_price" ])
61
+ update_data = data ["vaa" ]
62
+ price_feed = {
63
+ "feed_id" : data ["id" ],
64
+ "price" : price ,
65
+ "ema_price" : ema_price ,
66
+ "update_data" : [update_data ],
67
+ }
68
+ return price_feed
69
+
70
+ @staticmethod
71
+ def extract_price_feed_v2 (data : dict ) -> list [PriceFeed ]:
72
+ """
73
+ Extracts PriceFeed objects from the v2 JSON response (array of price feeds) from Hermes.
74
+ """
75
+ update_data = data ["binary" ]["data" ]
76
+
77
+ price_feeds = []
78
+
79
+ for feed in data ["parsed" ]:
80
+ price = Price .from_dict (feed ["price" ])
81
+ ema_price = Price .from_dict (feed ["ema_price" ])
82
+ price_feed = {
83
+ "feed_id" : feed ["id" ],
84
+ "price" : price ,
85
+ "ema_price" : ema_price ,
86
+ "update_data" : update_data ,
87
+ }
88
+ price_feeds .append (price_feed )
89
+
90
+ return price_feeds
91
+
92
+ async def get_pyth_prices_latest (self , feedIds : list [str ], version = 2 ) -> list [PriceFeed ]:
93
+ """
94
+ Queries the Hermes https endpoint for the latest price feeds for a list of Pyth feed IDs.
95
+ """
96
+ if version == 1 :
97
+ url = os .path .join (self .endpoint , "api/latest_price_feeds" )
98
+ params = {"ids[]" : feedIds , "binary" : "true" }
99
+ elif version == 2 :
100
+ url = os .path .join (self .endpoint , "v2/updates/price/latest" )
101
+ params = {"ids[]" : feedIds , "encoding" : "base64" , "parsed" : "true" }
102
+ else :
103
+ parse_unsupported_version (version )
104
+
105
+ async with httpx .AsyncClient () as client :
106
+ data = (await client .get (url , params = params )).json ()
107
+
108
+ if version == 1 :
109
+ results = []
110
+ for res in data :
111
+ price_feed = self .extract_price_feed_v1 (res )
112
+ results .append (price_feed )
113
+ elif version == 2 :
114
+ results = self .extract_price_feed_v2 (data )
115
+
116
+ return results
117
+
118
+ async def get_pyth_price_at_time (self , feed_id : str , timestamp : int , version = 2 ) -> PriceFeed :
119
+ """
120
+ Queries the Hermes https endpoint for the price feed for a Pyth feed ID at a given timestamp.
121
+ """
122
+ if version == 1 :
123
+ url = os .path .join (self .endpoint , "api/get_price_feed" )
124
+ params = {"id" : feed_id , "publish_time" : timestamp , "binary" : "true" }
125
+ elif version == 2 :
126
+ url = os .path .join (self .endpoint , f"v2/updates/price/{ timestamp } " )
127
+ params = {"ids[]" : [feed_id ], "encoding" : "base64" , "parsed" : "true" }
128
+ else :
129
+ parse_unsupported_version (version )
130
+
131
+ async with httpx .AsyncClient () as client :
132
+ data = (await client .get (url , params = params )).json ()
133
+
134
+ if version == 1 :
135
+ price_feed = self .extract_price_feed_v1 (data )
136
+ elif version == 2 :
137
+ price_feed = self .extract_price_feed_v2 (data )[0 ]
138
+
139
+ return price_feed
140
+
141
+ async def get_all_prices (self , version = 2 ) -> dict [str , PriceFeed ]:
142
+ """
143
+ Queries the Hermes http endpoint for the latest price feeds for all feed IDs in the class object.
144
+
145
+ There is a limit on the number of feed IDs that can be queried at once, so this function queries the feed IDs in batches.
146
+ """
147
+ pyth_prices_latest = []
148
+ i = 0
149
+ while len (self .feed_ids [i : i + self .feed_batch_size ]) > 0 :
150
+ pyth_prices_latest += await self .get_pyth_prices_latest (
151
+ self .feed_ids [i : i + self .feed_batch_size ],
152
+ version = version ,
153
+ )
154
+ i += self .feed_batch_size
155
+
156
+ return dict ([(feed ['feed_id' ], feed ) for feed in pyth_prices_latest ])
157
+
158
+ async def ws_pyth_prices (self , version = 1 ):
159
+ """
160
+ Opens a websocket connection to Hermes for latest prices for all feed IDs in the class object.
161
+ """
162
+ if version != 1 :
163
+ parse_unsupported_version (version )
164
+
165
+ async with websockets .connect (self .ws_endpoint ) as ws :
166
+ while True :
167
+ # add new price feed ids to the ws subscription
168
+ if len (self .pending_feed_ids ) > 0 :
169
+ json_subscribe = {
170
+ "ids" : self .pending_feed_ids ,
171
+ "type" : "subscribe" ,
172
+ "verbose" : True ,
173
+ "binary" : True ,
174
+ }
175
+ await ws .send (json .dumps (json_subscribe ))
176
+ self .pending_feed_ids = []
177
+
178
+ msg = json .loads (await ws .recv ())
179
+ if msg .get ("type" ) == "response" :
180
+ if msg .get ("status" ) != "success" :
181
+ raise Exception ("Error in subscribing to websocket" )
182
+ try :
183
+ if msg ["type" ] != "price_update" :
184
+ continue
185
+
186
+ feed_id = msg ["price_feed" ]["id" ]
187
+ new_feed = msg ["price_feed" ]
188
+
189
+ self .prices_dict [feed_id ] = self .extract_price_feed_v1 (new_feed )
190
+
191
+ except Exception as e :
192
+ raise Exception (f"Error in price_update message: { msg } " ) from e
0 commit comments