Bei der Definition von WebSockets deklarieren Sie normalerweise einen Parameter vom Typ WebSocket und können damit Daten vom Client lesen und an ihn senden. Er wird direkt von Starlette bereitgestellt, Sie können ihn aber von fastapi importieren:
fromfastapiimportWebSocket
Tipp
Wenn Sie Abhängigkeiten definieren möchten, die sowohl mit HTTP als auch mit WebSockets kompatibel sein sollen, können Sie einen Parameter definieren, der eine HTTPConnection anstelle eines Request oder eines WebSocket akzeptiert.
asyncdefreceive(self)->Message:""" Receive ASGI websocket messages, ensuring valid state transitions. """ifself.client_state==WebSocketState.CONNECTING:message=awaitself._receive()message_type=message["type"]ifmessage_type!="websocket.connect":raiseRuntimeError('Expected ASGI message "websocket.connect", 'f"but got {message_type!r}")self.client_state=WebSocketState.CONNECTEDreturnmessageelifself.client_state==WebSocketState.CONNECTED:message=awaitself._receive()message_type=message["type"]ifmessage_typenotin{"websocket.receive","websocket.disconnect"}:raiseRuntimeError('Expected ASGI message "websocket.receive" or 'f'"websocket.disconnect", but got {message_type!r}')ifmessage_type=="websocket.disconnect":self.client_state=WebSocketState.DISCONNECTEDreturnmessageelse:raiseRuntimeError('Cannot call "receive" once a disconnect message has been received.')
asyncdefsend(self,message:Message)->None:""" Send ASGI websocket messages, ensuring valid state transitions. """ifself.application_state==WebSocketState.CONNECTING:message_type=message["type"]ifmessage_typenotin{"websocket.accept","websocket.close","websocket.http.response.start",}:raiseRuntimeError('Expected ASGI message "websocket.accept",''"websocket.close" or "websocket.http.response.start",'f"but got {message_type!r}")ifmessage_type=="websocket.close":self.application_state=WebSocketState.DISCONNECTEDelifmessage_type=="websocket.http.response.start":self.application_state=WebSocketState.RESPONSEelse:self.application_state=WebSocketState.CONNECTEDawaitself._send(message)elifself.application_state==WebSocketState.CONNECTED:message_type=message["type"]ifmessage_typenotin{"websocket.send","websocket.close"}:raiseRuntimeError('Expected ASGI message "websocket.send" or "websocket.close", 'f"but got {message_type!r}")ifmessage_type=="websocket.close":self.application_state=WebSocketState.DISCONNECTEDtry:awaitself._send(message)exceptOSError:self.application_state=WebSocketState.DISCONNECTEDraiseWebSocketDisconnect(code=1006)elifself.application_state==WebSocketState.RESPONSE:message_type=message["type"]ifmessage_type!="websocket.http.response.body":raiseRuntimeError('Expected ASGI message "websocket.http.response.body", 'f"but got {message_type!r}")ifnotmessage.get("more_body",False):self.application_state=WebSocketState.DISCONNECTEDawaitself._send(message)else:raiseRuntimeError('Cannot call "send" once a close message has been sent.')
asyncdefaccept(self,subprotocol:str|None=None,headers:typing.Iterable[tuple[bytes,bytes]]|None=None,)->None:headers=headersor[]ifself.client_state==WebSocketState.CONNECTING:# If we haven't yet seen the 'connect' message, then wait for it first.awaitself.receive()awaitself.send({"type":"websocket.accept","subprotocol":subprotocol,"headers":headers})
asyncdefreceive_text(self)->str:ifself.application_state!=WebSocketState.CONNECTED:raiseRuntimeError('WebSocket is not connected. Need to call "accept" first.')message=awaitself.receive()self._raise_on_disconnect(message)returntyping.cast(str,message["text"])
asyncdefreceive_bytes(self)->bytes:ifself.application_state!=WebSocketState.CONNECTED:raiseRuntimeError('WebSocket is not connected. Need to call "accept" first.')message=awaitself.receive()self._raise_on_disconnect(message)returntyping.cast(bytes,message["bytes"])
asyncdefreceive_json(self,mode:str="text")->typing.Any:ifmodenotin{"text","binary"}:raiseRuntimeError('The "mode" argument should be "text" or "binary".')ifself.application_state!=WebSocketState.CONNECTED:raiseRuntimeError('WebSocket is not connected. Need to call "accept" first.')message=awaitself.receive()self._raise_on_disconnect(message)ifmode=="text":text=message["text"]else:text=message["bytes"].decode("utf-8")returnjson.loads(text)
asyncdefsend_json(self,data:typing.Any,mode:str="text")->None:ifmodenotin{"text","binary"}:raiseRuntimeError('The "mode" argument should be "text" or "binary".')text=json.dumps(data,separators=(",",":"),ensure_ascii=False)ifmode=="text":awaitself.send({"type":"websocket.send","text":text})else:awaitself.send({"type":"websocket.send","bytes":text.encode("utf-8")})