Skip to main content

Streaming

Beta software

connect-python is in beta. 1.0 will include a new Protobuf implementation built from scratch by Buf, which may introduce breaking changes. Join us on Slack if you have questions or feedback.

Connect supports several types of streaming RPCs. Streaming is exciting — it's fundamentally different from the web's typical request-response model, and in the right circumstances it can be very efficient. If you've been writing the same pagination or polling code for years, streaming may look like the answer to all your problems.

Temper your enthusiasm. Streaming also comes with many drawbacks:

  • It requires excellent HTTP libraries. At the very least, the client and server must be able to stream HTTP/1.1 request and response bodies. For bidirectional streaming, both parties must support HTTP/2. Long-lived streams are much more likely to encounter bugs and edge cases in HTTP/2 flow control.
  • It requires excellent proxies. Every proxy between the server and client — including those run by cloud providers — must support HTTP/2.
  • It weakens the protections offered to your unary handlers, since streaming typically requires proxies to be configured with much longer timeouts.
  • It requires complex tools. Streaming RPC protocols are much more involved than unary protocols, so cURL and your browser's network inspector are useless.

In general, streaming ties your application more closely to your networking infrastructure and makes your application inaccessible to less-sophisticated clients. You can minimize these downsides by keeping streams short-lived.

All that said, connect-python fully supports client and server streaming. Bidirectional streaming is currently not supported for clients and requires an HTTP/2 ASGI server for servers.

Streaming variants

In Python, streaming messages use standard AsyncIterator for async servers and clients, or Iterator for sync servers and clients.

In client streaming, the client sends multiple messages. Once the server receives all the messages, it responds with a single message. In Protobuf schemas, client streaming methods look like this:

service GreetService {
rpc Greet(stream GreetRequest) returns (GreetResponse) {}
}

In server streaming, the client sends a single message and the server responds with multiple messages. In Protobuf schemas, server streaming methods look like this:

service GreetService {
rpc Greet(GreetRequest) returns (stream GreetResponse) {}
}

In bidirectional streaming (often called bidi), the client and server may both send multiple messages. Often, the exchange is structured like a conversation: the client sends a message, the server responds, the client sends another message, and so on. Keep in mind that this always requires end-to-end HTTP/2 support! In Protobuf schemas, bidi streaming methods look like this:

service GreetService {
rpc Greet(stream GreetRequest) returns (stream GreetResponse) {}
}

HTTP representation

Streaming responses always have an HTTP status of 200 OK. This may seem unusual, but it's unavoidable: the server may encounter an error after sending a few messages, when the HTTP status has already been sent to the client. Rather than relying on the HTTP status, streaming handlers encode any errors in HTTP trailers or at the end of the response body.

The body of streaming requests and responses envelopes your schema-defined messages with a few bytes of protocol-specific binary framing data. Because of the interspersed framing data, the payloads are no longer valid Protobuf or JSON: instead, they use protocol-specific Content-Types like application/connect+proto.

An example

Let's start by amending the GreetService we defined in Getting Started to make the Greet method use client streaming:

syntax = "proto3";

package greet.v1;

message GreetRequest {
string name = 1;
}

message GreetResponse {
string greeting = 1;
}

service GreetService {
rpc Greet(stream GreetRequest) returns (GreetResponse) {}
}

After running buf generate to update our generated code, we can amend our service implementation in server.py:

from greet.v1.greet_connect import GreetService, GreetServiceASGIApplication
from greet.v1.greet_pb2 import GreetResponse

class Greeter(GreetService):
async def greet(self, request, ctx):
print("Request headers: ", ctx.request_headers())
greeting = ""
async for message in request:
greeting += f"Hello, {message.name}!\n"
response = GreetResponse(greeting=greeting)
ctx.response_headers()["greet-version"] = "v1"
return response

app = GreetServiceASGIApplication(Greeter())

That's it - metadata interceptors such as our simple authentication interceptor can be used as-is with no other changes.