Skip to content

Commit

Permalink
Allow adding a custom interceptor and add a code example to demonstra…
Browse files Browse the repository at this point in the history
…te how to do so

Change-Id: I9f508f1c0dd982de82ef95d6a2994df36b764e3b
  • Loading branch information
Raibaz committed Mar 6, 2024
1 parent 210a2b9 commit 06b06ab
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 5 deletions.
4 changes: 2 additions & 2 deletions Google.Ads.Gax/src/Interceptors/StreamingRpcInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace Google.Ads.Gax.Interceptors
/// </summary>
/// <typeparam name="TResponse">The type of the response.</typeparam>
/// <seealso cref="Grpc.Core.IAsyncStreamReader{TResponse}" />
internal class StreamingRpcInterceptor<TResponse> : IAsyncStreamReader<TResponse>
public class StreamingRpcInterceptor<TResponse> : IAsyncStreamReader<TResponse>
where TResponse : class
{
/// <summary>
Expand All @@ -45,7 +45,7 @@ internal class StreamingRpcInterceptor<TResponse> : IAsyncStreamReader<TResponse
/// <param name="streamReader">The stream reader to wrap over..</param>
/// <param name="moveNextCallback">The callback to be invoked when MoveNext() is
/// called.</param>
internal StreamingRpcInterceptor(IAsyncStreamReader<TResponse> streamReader,
public StreamingRpcInterceptor(IAsyncStreamReader<TResponse> streamReader,
Action<TResponse, AggregateException> moveNextCallback)
{
this.innerStreamReader = streamReader;
Expand Down
4 changes: 2 additions & 2 deletions Google.Ads.Gax/src/Interceptors/UnaryRpcInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Google.Ads.Gax.Interceptors
/// This class provides functionality to intercept tasks and callbacks, and performs custom
/// exception handling.
/// </summary>
internal class UnaryRpcInterceptor
public class UnaryRpcInterceptor
{
/// <summary>
/// Intercepts an async unary call adds a custom exception handler.
Expand All @@ -34,7 +34,7 @@ internal class UnaryRpcInterceptor
/// <param name="responseCallback">Response callback</param>
/// <returns>The async unary call with custom exception handling for
/// <see cref="RpcException"/>.</returns>
internal static AsyncUnaryCall<TResponse> Intercept<TResponse>(
public static AsyncUnaryCall<TResponse> Intercept<TResponse>(
AsyncUnaryCall<TResponse> call, Action<Task<TResponse>> responseCallback)
{
return new AsyncUnaryCall<TResponse>(
Expand Down
17 changes: 16 additions & 1 deletion Google.Ads.GoogleAds.Core/src/Lib/GoogleAdsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
using Google.Ads.Gax.Lib;
using Google.Ads.GoogleAds.Config;
using Google.Api.Gax.Grpc;
using Grpc.Core.Interceptors;
using System;
using System.Collections.Generic;

namespace Google.Ads.GoogleAds.Lib
{
Expand All @@ -25,6 +27,9 @@ namespace Google.Ads.GoogleAds.Lib
/// </summary>
public class GoogleAdsClient : AdsClient<GoogleAdsConfig>
{

private List<Interceptor> userInterceptors = new List<Interceptor>();

/// <summary>
/// Initializes a new instance of the <see cref="GoogleAdsClient"/> class.
/// </summary>
Expand All @@ -39,6 +44,15 @@ public GoogleAdsClient() : this(new GoogleAdsConfig())
/// <param name="config">The client configuration.</param>
public GoogleAdsClient(GoogleAdsConfig config) : base(config) { }

/// <summary>
/// Adds a custom gRPC <see cref="Interceptor"/>.
/// </summary>
/// <param name="interceptor">The custom interceptor.</param>
public void AddInterceptor(Interceptor interceptor)
{
userInterceptors.Add(interceptor);
}

/// <summary>
/// <para>Gets an instance of the specified service. Use this method with a predefined
/// list of templates available for each supported version. E.g.</para>
Expand All @@ -59,7 +73,8 @@ public TService GetService<TService, TServiceSetting>(
where TServiceSetting : ServiceSettingsBase, new()
where TService : GoogleAdsServiceClientBase
{
GoogleAdsServiceClientFactory factory = new GoogleAdsServiceClientFactory();
GoogleAdsServiceClientFactory factory =
new GoogleAdsServiceClientFactory(customInterceptors);
TService service = factory.GetService(serviceTemplate, Config);
service.ServiceContext.Client = this;
return service;
Expand Down
16 changes: 16 additions & 0 deletions Google.Ads.GoogleAds.Core/src/Lib/GoogleAdsServiceClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Grpc.Core;
using Grpc.Core.Interceptors;
using System;
using System.Collections.Generic;

namespace Google.Ads.GoogleAds.Lib
{
Expand All @@ -30,11 +31,21 @@ namespace Google.Ads.GoogleAds.Lib
/// </summary>
internal class GoogleAdsServiceClientFactory : AdsServiceClientFactory
{
/// <summary>
/// The custom interceptors.
/// </summary>
private List<Interceptor> userInterceptors = new List<Interceptor>();

/// <summary>
/// The channel factory.
/// </summary>
private CachedChannelFactory channelFactory = new CachedChannelFactory();

internal GoogleAdsServiceClientFactory(List<Interceptor> userInterceptors)
{
this.userInterceptors = userInterceptors;
}

/// <summary>
/// Gets an instance of the specified service.
/// </summary>
Expand All @@ -51,6 +62,11 @@ internal TService GetService<TService, TServiceSetting>(
CallInvoker interceptedInvoker = channel
.Intercept(new GoogleAdsGrpcInterceptor());

foreach (Interceptor customInterceptor in userInterceptors)
{
interceptedInvoker = interceptedInvoker.Intercept(customInterceptor);
}

CallInvoker callInvoker = config.EnableProfiling ?
new ProfilingCallInvoker(interceptedInvoker, config) : interceptedInvoker;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using CommandLine;
using Google.Api.Gax;
using Google.Ads.Gax.Examples;
using Google.Ads.Gax.Interceptors;
using Google.Ads.GoogleAds.Lib;
using Google.Ads.GoogleAds.V16.Errors;
using Google.Ads.GoogleAds.V16.Services;
using Grpc.Core;
using Grpc.Core.Interceptors;
using System;

using System.Threading.Tasks;

namespace Google.Ads.GoogleAds.Examples.V16
{
/// <summary>
/// This code example shows how to add a custom gRPC interceptor.
/// </summary>
public class AddCustomGrpcInterceptor : ExampleBase
{
/// <summary>
/// Command line options for running the <see cref="AddCustomGrpcInterceptor"/> example.
/// </summary>
public class Options : OptionsBase
{
/// <summary>
/// The Google Ads customer ID for which the call is made.
/// </summary>
[Option("customerId", Required = true, HelpText =
"The Google Ads customer ID for which the call is made.")]
public long CustomerId { get; set; }
}

/// <summary>
/// Main method, to run this code example as a standalone application.
/// </summary>
/// <param name="args">The command line arguments.</param>
public static void Main(string[] args)
{
Options options = ExampleUtilities.ParseCommandLine<Options>(args);

AddCustomGrpcInterceptor codeExample = new AddCustomGrpcInterceptor();
Console.WriteLine(codeExample.Description);
GoogleAdsClient client = new GoogleAdsClient();
// Add a custom interceptor.
client.AddInterceptor(new CustomInterceptor());
codeExample.Run(client,
options.CustomerId);
}

/// <summary>
/// Returns a description about the code example.
/// </summary>
public override string Description =>
"This code example shows how to add a custom gRPC interceptor.";

/// <summary>
/// Runs the code example.
/// </summary>
/// <param name="client">The Google Ads client.</param>
/// <param name="customerId">The Google Ads customer ID for which the call is made.</param>
public void Run(GoogleAdsClient client, long customerId)
{
// Get the GoogleAdsService.
GoogleAdsServiceClient googleAdsService = client.GetService(
Services.V16.GoogleAdsService);

// Create a query that will retrieve all campaigns, just to demonstrate usage of the
// custom interceptor.
string query = @"SELECT
campaign.id,
FROM campaign
ORDER BY campaign.id";

try
{
// Issue a streaming search request; we don't need to do anything with the response
// here, we just want to demonstrate usage of the interceptor.
googleAdsService.SearchStream(customerId.ToString(), query,
delegate (SearchGoogleAdsStreamResponse resp){}
);
}
catch (GoogleAdsException e)
{
Console.WriteLine("Failure:");
Console.WriteLine($"Message: {e.Message}");
Console.WriteLine($"Failure: {e.Failure}");
Console.WriteLine($"Request ID: {e.RequestId}");
throw;
}

try
{
// Issue a non-streaming call.
PagedEnumerable<SearchGoogleAdsResponse, GoogleAdsRow> response =
googleAdsService.Search(customerId.ToString(), query);
foreach (GoogleAdsRow googleAdsRow in response)
{
// The response for Search is lazy, meaning that the actual gRPC call will be
// sent only when the response is actually accessed; to demonstrate usage of
// the interceptor, then, we need to ensure the call is sent by looping
// through the response results.
Console.WriteLine("Campaign with ID {0} was found.",
googleAdsRow.Campaign.Id, googleAdsRow.Campaign.Name);
}
}
catch (GoogleAdsException e)
{
Console.WriteLine("Failure:");
Console.WriteLine($"Message: {e.Message}");
Console.WriteLine($"Failure: {e.Failure}");
Console.WriteLine($"Request ID: {e.RequestId}");
throw;
}
}
}

/// <summary>
/// A custom interceptor for both streaming and non-streaming gRPC calls.
/// </summary>
internal class CustomInterceptor : Interceptor
{

public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuationCallback)
{
AsyncUnaryCall<TResponse> call = continuationCallback(request, context);

Action<Task<TResponse>> callback = delegate (Task<TResponse> oldTask)
{
Console.WriteLine($"Intercepted a non-streaming call to {context.Method.Name}");
};

return UnaryRpcInterceptor.Intercept(call, callback);
}

public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
{
AsyncServerStreamingCall<TResponse> call = continuation(request, context);
StreamingRpcInterceptor<TResponse> responseStream = null;

responseStream = new StreamingRpcInterceptor<TResponse>(call.ResponseStream,
delegate (TResponse response, AggregateException rpcException)
{
Console.WriteLine($"Intercepted a streaming call to {context.Method.Name}");
});

return new AsyncServerStreamingCall<TResponse>(
responseStream,
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose
);
}
}
}

0 comments on commit 06b06ab

Please sign in to comment.