我有一个Spring Boot服务,它使用服务器发送的事件(SSE)将更新流向客户端。客户端连接的端点是用Spring WebFlux实现的。
为了清理资源(删除AMQP队列),我的服务需要检测客户是否关闭了EventSource,也就是终止了连接。为此,我通过
注册了一个回调。当然,我的SSE Flux会定期发送心跳,不仅是为了防止连接超时,也是为了
在客户端断开连接时触发
FluxSink#onDispose(Disposable)
。
onDispose
@Nonnull
@Override
public Flux<ServerSentEvent<?>> subscribeToNotifications(@Nonnull String queueName) {
final var queue = createQueue(queueName);
final var listenerContainer = createListenerContainer(queueName);
final var notificationStream = createNotificationStream(queueName, listenerContainer);
return notificationStream
.mergeWith(heartbeatStream)
.map(NotificationServiceImpl::toServerSentEvent);
@Nonnull
private Flux<NotificationDto> createNotificationStream(
@Nonnull String queueName,
@Nonnull MessageListenerContainer listenerContainer) {
return Flux.create(emitter -> {
listenerContainer.setupMessageListener(message -> handleAmqpMessage(message, emitter));
emitter.onRequest(consumer -> listenerContainer.start());
emitter.onDispose(() -> {
final var deleted = amqpAdmin.deleteQueue(queueName);
if (deleted) {
LOGGER.info("Queue {} successfully deleted", queueName);
} else {
LOGGER.warn("Failed to delete queue {}", queueName);
listenerContainer.stop();
这在本地运作得很好;一旦客户端断开连接,队列就会被删除/消息被记录下来。
然而,当把这个服务部署到我的Kubernetes集群时,onDispose
从未被调用。SSE流仍然工作得很好,也就是说,客户端从服务器接收所有数据,并且连接通过心跳保持活力。
我正在使用NGINX Ingress Controller来暴露我的服务,似乎NGINX和我的服务之间的连接在客户端断开后仍然保持活力,导致onDispose
从未被调用。因此,我试着将上游的保持连接设置为0
,但这并没有解决问题--服务从未被通知客户端已经关闭连接。
# Source: ingress-nginx/templates/controller-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
labels:
helm.sh/chart: ingress-nginx-4.0.6
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/version: 1.0.4
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/component: controller
name: ingress-nginx-controller
namespace: ingress-nginx
data:
allow-snippet-annotations: 'true'
http-snippet: |
server{
listen 2443;
return 308 https://$host$request_uri;
proxy-real-ip-cidr: 192.168.0.0/16
use-forwarded-headers: 'true'
upstream-keepalive-connections: '0' # added this line
我错过了什么?