From b50deeea9d38169060fb719383afbb1ea713766c Mon Sep 17 00:00:00 2001 From: Adam McCoy Date: Wed, 29 Oct 2025 15:37:29 +1100 Subject: [PATCH] Add dodgy hook when a subscriber is established --- .../Transport/Protocol/ProtocolFixture.cs | 2 +- .../Transport/SecureClientFixture.cs | 4 +-- source/Halibut/HalibutRuntime.cs | 13 ++++++--- source/Halibut/HalibutRuntimeBuilder.cs | 13 +++++++-- .../Observability/ISubscriberObserver.cs | 24 +++++++++++++++++ .../Observability/NullSubscriberObserver.cs | 27 +++++++++++++++++++ .../Protocol/MessageExchangeProtocol.cs | 7 ++++- 7 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 source/Halibut/Transport/Observability/ISubscriberObserver.cs create mode 100644 source/Halibut/Transport/Observability/NullSubscriberObserver.cs diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index 70b11fdd7..97142fdda 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -27,7 +27,7 @@ public void SetUp() stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server)); var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits); - protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For()); + protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For(), new NullSubscriberObserver()); } // TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation diff --git a/source/Halibut.Tests/Transport/SecureClientFixture.cs b/source/Halibut.Tests/Transport/SecureClientFixture.cs index d6df4e782..eff0e749f 100644 --- a/source/Halibut.Tests/Transport/SecureClientFixture.cs +++ b/source/Halibut.Tests/Transport/SecureClientFixture.cs @@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt() var connection = Substitute.For(); var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits); - connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log)); + connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log, new NullSubscriberObserver())); await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None); } @@ -102,7 +102,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger) { var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits); - return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger); + return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger, new NullSubscriberObserver()); } } } \ No newline at end of file diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index 4338e3cca..65e3e6ad3 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -47,6 +47,7 @@ public class HalibutRuntime : IHalibutRuntime readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter; readonly IControlMessageObserver controlMessageObserver; readonly ISslConfigurationProvider sslConfigurationProvider; + readonly ISubscriberObserver subscriberObserver; internal HalibutRuntime( IServiceFactory serviceFactory, @@ -63,8 +64,8 @@ internal HalibutRuntime( IConnectionsObserver connectionsObserver, IControlMessageObserver controlMessageObserver, ISecureConnectionObserver secureConnectionObserver, - ISslConfigurationProvider sslConfigurationProvider - ) + ISslConfigurationProvider sslConfigurationProvider, + ISubscriberObserver subscriberObserver) { this.serverCertificate = serverCertificate; this.trustProvider = trustProvider; @@ -79,6 +80,7 @@ ISslConfigurationProvider sslConfigurationProvider TimeoutsAndLimits = halibutTimeoutsAndLimits; this.connectionsObserver = connectionsObserver; this.secureConnectionObserver = secureConnectionObserver; + this.subscriberObserver = subscriberObserver; this.controlMessageObserver = controlMessageObserver; this.sslConfigurationProvider = sslConfigurationProvider; @@ -121,7 +123,12 @@ public int Listen(int port) ExchangeProtocolBuilder ExchangeProtocolBuilder() { - return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, log); + return (stream, log) => new MessageExchangeProtocol( + new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), + TimeoutsAndLimits, + activeTcpConnectionsLimiter, + log, + subscriberObserver); } public int Listen(IPEndPoint endpoint) diff --git a/source/Halibut/HalibutRuntimeBuilder.cs b/source/Halibut/HalibutRuntimeBuilder.cs index acd85cc65..531375db7 100644 --- a/source/Halibut/HalibutRuntimeBuilder.cs +++ b/source/Halibut/HalibutRuntimeBuilder.cs @@ -33,6 +33,7 @@ public class HalibutRuntimeBuilder IControlMessageObserver? controlMessageObserver; MessageStreamWrappers queueMessageStreamWrappers = new(); ISslConfigurationProvider? sslConfigurationProvider; + ISubscriberObserver? subscriberObserver; public HalibutRuntimeBuilder WithQueueMessageStreamWrappers(MessageStreamWrappers queueMessageStreamWrappers) { @@ -58,6 +59,12 @@ public HalibutRuntimeBuilder WithSslConfigurationProvider(ISslConfigurationProvi return this; } + public HalibutRuntimeBuilder WithSubscriptionObserver(ISubscriberObserver subscriberObserver) + { + this.subscriberObserver = subscriberObserver; + return this; + } + internal HalibutRuntimeBuilder WithStreamFactory(IStreamFactory streamFactory) { this.streamFactory = streamFactory; @@ -194,6 +201,7 @@ public HalibutRuntime Build() var rpcObserver = this.rpcObserver ?? new NoRpcObserver(); var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver(); var sslConfigurationProvider = this.sslConfigurationProvider ?? SslConfiguration.Default; + var subscriberObserver = this.subscriberObserver ?? new NullSubscriberObserver(); var halibutRuntime = new HalibutRuntime( serviceFactory, @@ -210,7 +218,8 @@ public HalibutRuntime Build() connectionsObserver, controlMessageObserver, secureConnectionObserver, - sslConfigurationProvider + sslConfigurationProvider, + subscriberObserver ); if (onUnauthorizedClientConnect is not null) @@ -221,4 +230,4 @@ public HalibutRuntime Build() return halibutRuntime; } } -} \ No newline at end of file +} diff --git a/source/Halibut/Transport/Observability/ISubscriberObserver.cs b/source/Halibut/Transport/Observability/ISubscriberObserver.cs new file mode 100644 index 000000000..a19dc62bc --- /dev/null +++ b/source/Halibut/Transport/Observability/ISubscriberObserver.cs @@ -0,0 +1,24 @@ +// Copyright 2012-2013 Octopus Deploy Pty. Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Threading; +using System.Threading.Tasks; + +namespace Halibut.Transport.Observability +{ + public interface ISubscriberObserver + { + Task SubscriberConnected(string uri, CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Observability/NullSubscriberObserver.cs b/source/Halibut/Transport/Observability/NullSubscriberObserver.cs new file mode 100644 index 000000000..fbc659ed9 --- /dev/null +++ b/source/Halibut/Transport/Observability/NullSubscriberObserver.cs @@ -0,0 +1,27 @@ +// Copyright 2012-2013 Octopus Deploy Pty. Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Threading; +using System.Threading.Tasks; + +namespace Halibut.Transport.Observability +{ + public class NullSubscriberObserver : ISubscriberObserver + { + public Task SubscriberConnected(string uri, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs index 777b7e268..a5930f9e3 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs @@ -5,6 +5,7 @@ using Halibut.Diagnostics; using Halibut.Exceptions; using Halibut.ServiceModel; +using Halibut.Transport.Observability; namespace Halibut.Transport.Protocol { @@ -21,15 +22,18 @@ public class MessageExchangeProtocol readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits; readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter; readonly ILog log; + readonly ISubscriberObserver subscriberObserver; bool identified; volatile bool acceptClientRequests = true; - public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log) + public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log, + ISubscriberObserver subscriberObserver) { this.stream = stream; this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter; this.log = log; + this.subscriberObserver = subscriberObserver; } public async Task ExchangeAsClientAsync(RequestMessage request, CancellationToken cancellationToken) @@ -112,6 +116,7 @@ public async Task ExchangeAsServerAsync(Func