Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for server-sent events #234

Closed
fitzgen opened this issue May 18, 2019 · 16 comments · Fixed by #456
Closed

Support for server-sent events #234

fitzgen opened this issue May 18, 2019 · 16 comments · Fixed by #456
Labels
design Open design question

Comments

@fitzgen
Copy link

fitzgen commented May 18, 2019

Feature Request

Support for server-sent events in Tide apps.

Detailed Description

Support for working creating server-sent event streams, easily, with Tide.

Some more general server-sent events info:

Context

I would like to use server-sent events. I suppose other folks might want to as well.

Possible Implementation

Ideally, I would be able to have some method on Response or some sort of type that implements IntoResponse that sets the Content-Type: text/event-stream header for me, and takes a Stream<Item = ServerSentEvent> where ServerSentEvent is a struct with an optional event type string and a data string.

Maybe it would automatically send heartbeats too, to keep the connection alive?

@yoshuawuyts
Copy link
Member

Reference from a Node.js implementation I did a while back:

Refs

Headers

      'Content-Type': 'text/event-stream',
      'X-Accel-Buffering': 'no',
      'Cache-Control': 'no-cache'

heartbeat

  var interval = setInterval(function () {
      if (res.finished) return // prevent writes after stream has closed
      res.write(`id:${id++}\ndata:{ "type:": "heartbeat" }\n\n`)
}, 4000)

data

In particular making sure to increment the id on every tick is useful because Chrome's debug tooling picks up on this and does a better job at displaying individual events in the network tab.

var msg = JSON.stringify({ type: 'scripts' })
res.write(`id:${id++}\ndata:${msg}\n\n`)

Screenshot

What it looks like to run this:

2019-05-19-152557_1920x1080

@pickfire
Copy link
Contributor

pickfire commented Aug 4, 2019

I believe people can just swap ServerSentEvent with the data they desired from Stream<Item = ServerSentEvent>.

In some cases, it may be better to return JSON over plain text, I wonder if Stream<Item = Json<Data>> helps?

By the way, I believe that this should be marked as design where it may be similar to #67.

@yoshuawuyts yoshuawuyts added the design Open design question label Aug 5, 2019
@yoshuawuyts
Copy link
Member

@pickfire the output stream should probably take an AsyncRead and produce an AsyncRead. It can then decorate the incoming messages with the right SSE framing, and also produce regular heartbeat messages.

In general we should probably move the http-service interface to use AsyncRead for both requests and responses rather than the existing pre-allocated streams that we're exposing.

@yoshuawuyts
Copy link
Member

API

I've sketched out an API we could use for Tide:

use async_std::task;

fn main() -> Result<(), std::io::Error> {
    task::block_on(async {
        let mut app = tide::new();
        app.at("/sse").get(tide::sse()); // shorthand function, we might also want to provide a builder
        app.at("/", async |req| {
            let mut sse = req.sse();
            println!("next message id: {}", sse.id());      // u64
            println!("sse status is: {}", sse.status());    // enum { Connecting, Open, Closed }
            println!("sse is open?: {}", sse.is_open());    // bool
            sse.send("message", b"hello world").await;      // send(&self, event: &str, data: &[u8])
            "hello world" // return some response
        });
        app.listen("127.0.0.1:8080").await?;
        Ok(())
    })
}

A single connection would be established per peer (when they make an EventSource connection to /sse in the example), and from there we'd keep it open with a keepalive until the client drops us.

The connection would be stored inside the app instance, and accessible as req.sse() for other connections from the same host. This has some security implications though (see next section).

We would probably also want to provide extra metadata on the sse object on Request. Info such as whether the connection is open, message id, etc. might be useful.

Security

We can't base the host purely on incoming TCP request since we could be behind a proxy. Instead we should be aware of x-forwarded-for and forwarded headers, and mention this as part of the security model.

References

This model is inspired by Phoenix's channels, which have a similar model. The main difference being that we don't take care of the multiplexing aspects, but instead leave it up to the application author to decide how they want to use the connection. Phoenix provides a matching client library to handle the "channel", and I think we probably shouldn't do that.


cc/ @goto-bus-stop I'm curious what you think of this API

@pickfire
Copy link
Contributor

        app.at("/sse").get(tide::sse()); // shorthand function, we might also want to provide a builder

Looking at that, seemed like it should only expected to have one server-sent events per server. I believe SSE is usually one per server but could there be multiple endpoints for SSE?

How about?

        app.at("/", async |req| {
            // assuming connection does not close in the block (not sure what happens if it does)
            // enum { Connecting, Open(SSEConnection), Closed }
            if let Open(mut sse) = req.sse() {
                println!("next message id: {}", sse.id()); // u64
                sse.send("message", b"hello world").await; // send(&self, event: &str, data: &[u8])
            }
            "hello world" // return some response
        });

@yoshuawuyts
Copy link
Member

@pickfire the /sse endpoint would be for clients to connect on; there could be multiple endpoints for this, but I'm quite sure clients should only have a single SSE connection per server in most cases.

I like your idea of wrapping the SSE object inside an option; that might perhaps be the best way to go about it .

@pickfire
Copy link
Contributor

Should the send return a result? Because there is no saying that SSE connection will be dropped while inside the block.

        app.at("/", async |req| {
            // assuming connection does not close in the block (not sure what happens if it does)
            // enum { Connecting, Open(SSEConnection), Closed }
            if let Open(mut sse) = req.sse() {
                println!("next message id: {}", sse.id()); // u64
                // <- sse connection dropped here
                sse.send("message", b"hello world").await?; // result?
            }
            "hello world" // return some response
        });

I wonder if we should even have that block in the first place, no saying when will the connection will be dropped.

By the way, Happy Chinese New Year! ^^

@yoshuawuyts
Copy link
Member

@pickfire Happy new year to you too! I'm thinking we should also add a try_send method that returns immediately if the connection isn't open. Related issue: async-rs/async-std#585.

I've been thinking about what you said earlier by the way, and actually I think it might be susceptible to race conditions. What happens if we need to send a message over SSE, but this message might be sent after the connection has been established.

In fact I think we should support several different scenarios; and probably returning an Option<SSE> from req.sse() is not as flexible as we should want it to be. Instead I think the original design might be a bit better suited here:

Examples

// Wait until we can send the message.
app.at("/", async |req| {
    req.sse().send("message", b"hello world").await;
    Response::new(200)
});

// Wait until we can send the message, but timeout after 5 secs.
app.at("/", async |req| {
    req.sse()
        .send("message", b"hello world")
        .timeout(Duration::from_secs(5))
        .await;
    Response::new(200)
});

// Error if we can't send the message immediately. This includes
// a full queue.
app.at("/", async |req| {
    req.sse().try_send("message", b"hello world")?.await;
    Response::new(200)
});

// Only send a message if we're already connected.
app.at("/", async |req| {
    let sse = req.sse();
    if sse.is_connected() {
        req.sse().send("message", b"hello world").await;
    }
    Response::new(200)
});

@pickfire
Copy link
Contributor

The other examples seems fair enough but looking at the fourth example.

// Only send a message if we're already connected.
app.at("/", async |req| {
    let sse = req.sse();
    if sse.is_connected() {
        req.sse().send("message", b"hello world").await;
    }
    Response::new(200)
});

It may be possible that sse connection maybe dropped just right after the check on is_connected?

@yoshuawuyts
Copy link
Member

yoshuawuyts commented Jan 27, 2020

@pickfire accurate; it could be a temporary hiccup and the client could reconnect. Or perhaps it's an actual timeout and we should timeout the whole request. We can't distinguish between these cases. Either way, waiting seems like the right behavior.

@pickfire
Copy link
Contributor

Or is it possible that we could return a response first and then wait to send the SSE?

@yoshuawuyts
Copy link
Member

@pickfire That's a possibility, though I see there might be difficulty in finding the right peer -- so it would need to take some context from Request, and use it for Response. That might make for a difficult mode.

I spent some time this week writing about the APIs here by the way: https://blog.yoshuawuyts.com/tide-channels/.

@goto-bus-stop
Copy link
Member

goto-bus-stop commented Jan 31, 2020

That's an interesting API direction—not something I had thought of! I'll write down some of my thoughts based mostly on an actual realtime app I have that's currently using WebSockets, but that could just as well use EventSource. I'm not sure that this is a typical app so some of the features I'd like may not be that important in general.

e; i wrote most of this a couple days ago so i hadn't seen the post yet 🙃 it looks like that addresses a lot of the same things, which is prob a good sign!

I think the most common case is having one SSE endpoint, but it can make sense to have multiple for different types of resources. Or something like a /thread/:id/posts-stream on a forum software, where the number of endpoints is not known.

One thing is kinda unclear to me in the proposed API (e; from the OP, but addressed in the blog post). If user A and B are connected to the SSE endpoint, and user B sends a request to /, what happens? Is this event broadcasted to both, is it only sent to A, or only sent to B?

There's a couple scenarios where you might need to send something over an SSE connection. Sometimes, you need to broadcast something to all clients, or to a subset, or to just one. Sometimes this may be directly in response to an HTTP request (then req.sse() is useful), but it could also be in response to any other event, like getting a new message from a message queue. Some of that could be achieved by allowing SSE connection instances to subscribe to a "topic", and then you can publish to that topic anywhere else in the app (whether in response to an HTTP request or anything else). An app could choose to subscribe to like a "globalChat" and a "$myUserId" topic to address the diffirent granularities. I think something like that would probably be implemented with a bunch of channels in Rust, and it could be mostly independent of the actual HTTP/SSE implementation. There might be a crate out there that does it already...

I think @fitzgen's proposed API is a good starting point regardless of what a higher level API would look like. If/when tide settles on a higher level API, having an escape hatch for the edge cases would still be valuable.

I think a higher level API could/should also attempt to make Last-Event-ID "just work", which would involve keeping some messages queued after a connection is lost. I don't know if this fits in very well with Tide because you might want to store your messages elsewhere (I store them in Redis in my app to be sorta resilient to server crashes and restarts.) Without giving much thought to implementation, I think abstraction layers could look something like:

  • IntoResponse for a stream of SSE messages
  • "Connection" abstraction that can handle reconnects with Last-Event-ID
  • Some pub/sub thing (and I think this could be where a tide::sse() comes in)

@yoshuawuyts
Copy link
Member

Is this event broadcasted to both, is it only sent to A, or only sent to B?

I was thinking it'd be "broadcast to both" by default, but we'd need to have some (as of yet unspecified) escape hatch to more accurately target clients.


Overall I'd love to read more on what you're thinking. I feel you're talking with from more experience than I am, and on the surface level I'm agreeing with much of what you're saying. It sounds like there are four layers to any SSE API then:

  1. API to create a Response with an SSE body (as per @fitzgen's proposal)
  2. API to create an SSE endpoint (as per my post)
    • Unsure how useful this is if the first API is easy to use.
  3. Some way to expose req.sse() / req.session().sse() so that requests can use existing SSE connections to the same peer (where peer = remotes that match the current session).
  4. Some way to expose grouping / topics so that sending messages to a grouping of peers is possible.

And this should be able to be matched with ways to persist message ID sequences, and queued messages on backing stores. This adds quite a few requirements, but they all seem legit.

I'm also wondering how the Connection abstraction would work as opposed to e.g. sessions (#9, #266). Some API sketches here would be fantastic.

@pickfire
Copy link
Contributor

pickfire commented Feb 1, 2020

For Server Sent Events, hearbeats should be implemented using comments. These are empty messages that are ignored by client implementations.

I thought Server Sent Events automatically reconnects when they disconnects?

How about event on disconnection? Maybe impl Drop for ServerSendEventsEndpoint or something similar to handle that?

impl ServerSentEvents {
    async fn send_json(&self, &impl Serialize) -> io::Result<()>;
}

Should we make only json or Serializer which allows other formats serde supports?

One more thing I noticed to be missing is how should we sent server sent events after a response have been sent to the client? So far the examples only go in the direction of sending the server sent events when a request is being sent by a clients. Maybe there are two more?

  1. sending server sent events after the response (or some sort of schedule)
  2. sending server sent events periodically

Regarding the grouping and stuff, from what I have seen from nodejs or socketio similar frameworks, there is an id linked to each channels, probably something like req.sse("id") which "id" should be a Hash. But yes, that is more than this proposal is going to handle.

It would be cool to explore prior art how other languages and different frameworks implement this.

@yoshuawuyts
Copy link
Member

Authored a lib for this today, primarily based on @goto-bus-stop's work: https://docs.rs/async-sse. The latest (2.1.0) version includes a channel-based encoder that should work nicely with the APIs that have been drafted so far.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design Open design question
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants