Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rest & Batch consolidation #16

Merged
merged 18 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This document covers some basic knowledge around External Adapters, what they ar
## What's the purpose of EAs?

- Abstract provider specific details, specifically:
- Transport (REST, WebSockets, RPC, SSE, etc.)
- Transport (HTTP, WebSockets, RPC, SSE, etc.)
- Authentication (login flows, keys)
- Accept normalized request payloads and translate into the provider's interface (this also includes things like symbols/tickers)
- Parse provider responses into the desired data points (e.g. price from crypto endpoint)
Expand Down
121 changes: 67 additions & 54 deletions docs/porting-a-v2-ea-to-v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,23 @@ Checklist when moving an EA from v2 to v3:

Each endpoint you port over will have its own Transport. Transports are what the EA uses to connect to the DP (you can read more about how they work [here](../basics.md#ea-v3-design)). In v2, it
was commonly Endpoints that connected to the DP, however in v3 this has been pushed one level down with the Endpoint
only being responsible to route to the transport. An EA that supports both WS and HTTP will have both a
**RestTransport**/**BatchWarmingTransport** and a **WebSocketTransport**.
only being responsible to route to the transport. An EA that supports both WS and HTTP will have both an
**HttpTransport** and a **WebSocketTransport**.

**Select the right Transport to use:**

- **WebSocketTransport**
Use for WS connections to the DP
- **BatchWarmingTransport**
Use this transport when the DP endpoint allows multiple EA requests to be served in a single HTTP request to the DP.
Example: a price API that allows you to query: `/price?from=ETH,BTC,LINK&to=USD`
- **RestTransport**
Use this transport when the endpoint only allows a single EA request to be served for each HTTP request to the DP.
Example: a price API with the query: `/price/ETH/USD`
> **Warning**
> The RestTransport in its current form will be deprecated. If the DP supports batching, use the
> BatchWarmingTransport instead. You can proceed with the RestTransport if the EA is expected to have very low
> throughput (i.e. one request a day/hour). If not, **you should not migrate the EA to v3 at this time**.
- **HttpTransport**
Use for sending HTTP requests to the DP

If a single endpoint supports multiple transports, these transports should be wrapped in a **RoutingTransport**. An
example implementation:

```typescript
export const routingTransport = new RoutingTransport<CryptoEndpointTypes>(
{
HTTP: restTransport,
HTTP: httpTransport,
WS: wsTransport,
},
(req) => {
Expand All @@ -60,10 +52,10 @@ adapter
│ │ └─ RoutingTransport // A single routing transport that wraps multiple
│ │ │ // underlying transports to ensure types and
│ │ │ // input params stay consistent.
│ │ ├─ BatchWarmingTransport
│ │ ├─ HttpTransport
│ │ └─ WebSocketTransport
│ └─ volume // Input: {"endpoint": "volume"}
│ └─ RestTransport
│ └─ HttpTransport
└─ index // References endpoints, rate limit tiers, custom settings, etc.
```

Expand All @@ -81,36 +73,39 @@ export type EndpointTypes = {
Provider: {
... // Provider specific details, these differ and are defined by each Transport implementation
}
}
}
```

These types will also be shared with the Endpoint that’s referencing this transport.

## Building a REST/Batch transport
## Building an HTTP Transport

Building a REST mainly consists of defining types (above), and defining how to build the request and parse the response
to and from the DP.
Building an HTTP transport mainly consists of defining types (above), and defining how to build the request and parse the response
to and from the DP. An example that sends request to a non-batch endpoint:

```typescript
const restTransport = new RestTransport<EndpointTypes>({
prepareRequest: (req, config) => {
// The `req` param contains the request made to the EA.
const httpTransport = new HttpTransport<EndpointTypes>({
prepareRequests: (params, config) => {
// The `params` param contains all the requests made to the EA that need data fetched from the DP.
// Using this, return the request config to the DP.
return {
baseURL: config.API_ENDPOINT || DEFAULT_API_ENDPOINT,
url: '/price',
method: 'GET',
params: {
from: req.base,
to: req.quote,
},
}
return params.map(req => ({
params: req,
request: {
baseURL: config.API_ENDPOINT, // Default endpoint defined in configs
url: '/price',
method: 'GET',
params: {
from: req.base,
to: req.quote,
},
}
})
},
parseResponse: (req, res) => {
// The `req` param contains the request made to the EA.
parseResponse: (params, res) => {
// The `params` param contains the requests made to the EA that correspond to this DP response.
// The `res` param contains the response from the DP.
// Using this, parse the resulting value we want to return.
const req = params[0] // Since this endpoint only covers one pair at a time
return {
data: res,
// Assuming res = { [base]: { [quote]: { price: number } } }
Expand All @@ -120,45 +115,63 @@ const restTransport = new RestTransport<EndpointTypes>({
})
```

A batch warming transport is not much different. Instead of receiving a single `req` in `prepareRequest()` and
`parseResponse()`, you now receive an array of requests in `params`:
An example to a batch endpoint is not much different:

```typescript
const batchTransport = new BatchWarmingTransport<EndpointTypes>({
const httpTransport = new HttpTransport<EndpointTypes>({
prepareRequest: (params, config) => {
// The `params` param contains an array of the request made to the EA.
// Using this, return the request config to the DP.
return {
baseURL: config.API_ENDPOINT || DEFAULT_API_ENDPOINT,
url: '/price',
method: 'GET',
params: {
from: params.map((req) => req.base).join(','),
to: params.map((req) => req.quote).join(','),
},
params,
request: {
baseURL: config.API_ENDPOINT, // Default endpoint defined in configs
url: '/price',
method: 'GET',
params: {
from: params.map((req) => req.base).join(','),
to: params.map((req) => req.quote).join(','),
},
}
}
},
parseResponse: (params, res) => {
// The `params` param contains an array of the request made to the EA.
// The `res` param contains the response from the DP.
// Using this, return an array of each combination request-response combination:
// return [
// {
// params: { base: "ETH", quote: "USD" }
// value: 123.45
// },
// {
// params: { base: "BTC", quote: "USD" }
// value: 123.45
// }
// ]
return [
{
params: { base: "ETH", quote: "USD" }
response: {
result: data.price,
data: {
result: data.price
},
timestamps: {
providerIndicatedTime: data.timestamp
}
}
},
{
params: { base: "BTC", quote: "USD" }
response: {
result: data.price,
data: {
result: data.price
},
timestamps: {
providerIndicatedTime: data.timestamp
}
}
}
]
},
})
```

## Building a WebSocket Transport

A WebSocket transport is a bit different from REST/Batch transports, but also boils down to two things: forming a
A WebSocket transport is a bit different from the HttpTransport, but also boils down to two things: forming a
request to the DP and parsing messages to results to be stored in cache.

```typescript
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
"@types/node": "18.11.11",
"@types/sinonjs__fake-timers": "8.1.2",
"@types/ws": "8.5.3",
"@typescript-eslint/eslint-plugin": "5.39.0",
"@typescript-eslint/parser": "5.45.1",
"@typescript-eslint/eslint-plugin": "5.46.0",
"@typescript-eslint/parser": "5.46.0",
"ava": "4.3.3",
"c8": "7.12.0",
"eslint": "8.24.0",
"eslint": "8.29.0",
"eslint-config-prettier": "8.5.0",
"eslint-plugin-tsdoc": "0.2.17",
"mock-socket": "9.1.5",
Expand Down
15 changes: 6 additions & 9 deletions src/adapter/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
import * as transportMetrics from '../transports/metrics'
import {
buildRateLimitTiersFromConfig,
FixedFrequencyRateLimiter,
getRateLimitingTier,
highestRateLimitTiers,
SimpleCountingRateLimiter,
Expand All @@ -30,6 +29,7 @@ import {
RequestTransform,
} from './types'
import { AdapterTimeoutError } from '../validation/error'
import { Requester } from '../util/requester'

const logger = makeLogger('Adapter')

Expand Down Expand Up @@ -314,14 +314,8 @@ export class Adapter<CustomSettings extends CustomAdapterSettings = SettingsMap>
value is higher than the highest tier value from limits.json ${highestTierValue}`)
}

if (!dependencies.requestRateLimiter) {
dependencies.requestRateLimiter = new SimpleCountingRateLimiter().initialize(
this.endpoints,
rateLimitingTier,
)
}
if (!dependencies.backgroundExecuteRateLimiter) {
dependencies.backgroundExecuteRateLimiter = new FixedFrequencyRateLimiter().initialize(
if (!dependencies.rateLimiter) {
dependencies.rateLimiter = new SimpleCountingRateLimiter().initialize(
this.endpoints,
rateLimitingTier,
)
Expand All @@ -333,6 +327,9 @@ export class Adapter<CustomSettings extends CustomAdapterSettings = SettingsMap>
dependencies.redisClient,
)
}
if (!dependencies.requester) {
dependencies.requester = new Requester(dependencies.rateLimiter, this.config as AdapterConfig)
}

return dependencies as AdapterDependencies
}
Expand Down
15 changes: 6 additions & 9 deletions src/adapter/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ import type EventSource from 'eventsource'
import Redis from 'ioredis'
import { Cache } from '../cache'
import { AdapterConfig, BaseAdapterConfig, SettingsMap } from '../config'
import {
AdapterRateLimitTier,
BackgroundExecuteRateLimiter,
RequestRateLimiter,
} from '../rate-limiting'
import { AdapterRateLimitTier, RateLimiter } from '../rate-limiting'
import { Transport, TransportGenerics } from '../transports'
import { AdapterRequest, RequestGenerics, SubscriptionSetFactory } from '../util'
import { Requester } from '../util/requester'
import { InputParameters } from '../validation'
import { AdapterError } from '../validation/error'
import { Adapter } from './basic'
Expand All @@ -27,10 +24,7 @@ export interface AdapterDependencies {
cache: Cache

/** Shared instance of the request rate limiter */
requestRateLimiter: RequestRateLimiter

/** Shared instance of the background execute rate limiter */
backgroundExecuteRateLimiter: BackgroundExecuteRateLimiter
rateLimiter: RateLimiter

/** Factory to create subscription sets based on the specified cache type */
subscriptionSetFactory: SubscriptionSetFactory
Expand All @@ -40,6 +34,9 @@ export interface AdapterDependencies {

/** EventSource to use for listening to server sent events. A mock EventSource can be provided as a dependency for testing */
eventSource: typeof EventSource

/** Shared instance to handle sending http requests in a centralized fashion */
requester: Requester
}

/**
Expand Down
15 changes: 7 additions & 8 deletions src/background-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Adapter, EndpointContext, AdapterEndpoint, EndpointGenerics } from './adapter'
import { Adapter, AdapterEndpoint, EndpointContext, EndpointGenerics } from './adapter'
import * as metrics from './metrics'
import { MetaTransport, Transport, TransportGenerics } from './transports'
import { makeLogger } from './util'
Expand All @@ -17,8 +17,6 @@ export async function callBackgroundExecutes(adapter: Adapter, apiShutdownPromis
// If no server is provided, the listener won't be set and serverClosed will always be false
let serverClosed = false

const rateLimiter = adapter.dependencies.backgroundExecuteRateLimiter

const timeoutsMap: {
[endpointName: string]: NodeJS.Timeout
} = {}
Expand Down Expand Up @@ -69,13 +67,14 @@ export async function callBackgroundExecutes(adapter: Adapter, apiShutdownPromis
} catch (error) {
logger.error(error)
}
const timeToWait = rateLimiter.msUntilNextExecution(endpoint.name)
logger.debug(
`Finished background execute for endpoint "${endpoint.name}", sleeping for ${timeToWait}ms`,
)

// This background execute loop is no longer the one to determine the sleep between bg execute calls.
// That is now instead responsibility of each transport, to allow for custom ones to implement their own timings.
logger.trace(
`Finished background execute for endpoint "${endpoint.name}", calling it again in 1ms...`,
)
metricsTimer()
timeoutsMap[endpoint.name] = setTimeout(handler, timeToWait)
timeoutsMap[endpoint.name] = setTimeout(handler, 1)
}

// Start recursive async calls
Expand Down
Loading