QGrpcClientInterceptor caching example

Caching Interceptor

The Caching Interceptor can be a versatile tool for improving the performance of your Qt GRPC connections. By creating a custom interceptor, you can tailor the caching behavior to meet the specific requirements of your application.

Creating a Caching Interceptor

To create a Caching Interceptor, you'll need to subclass QGrpcClientInterceptor and override the appropriate interception method(s) to incorporate the caching functionality.

Prerequisites

To establish what types can be processed by our interceptor, let's say our .proto file is:

syntax = "proto3";

message SimpleStringMessage {
    string testFieldString = 6;
}

service TestService {
    rpc testMethod(SimpleStringMessage) returns (SimpleStringMessage) {}
    rpc testMethodServerStream(SimpleStringMessage) returns (stream SimpleStringMessage) {}
}

We will also quickly establish what MyCacheStorage API looks like:

  • insert() method, can insert a new entry to the cache (in our case the entries will be stored as QString)
  • insert_or_append() method, similar to insert(), but it will append the data if the entry for the method, and service already exists, and it's not finalized yet - this will be used to cache streamed responses.
  • find(), this method finds entry with method and service keys.
  • finalize(), the method finalizes the entry in the cache, for stream use-case, calling that method means the full response has been cached.

CachingInterceptor implementation

Here's an example of a simple caching interceptor:

class CachingInterceptor : public QGrpcClientInterceptor
{
protected:
    void interceptCall(std::shared_ptr<QGrpcChannelOperation> operation,
                                                  std::shared_ptr<QGrpcCallReply> response,
                                                  QGrpcInterceptorContinuation<QGrpcCallReply> &continuation) override
    {
        // Intercept the response
        QObject::connect(response.get(), &QGrpcCallReply::finished, this,
                             [operation, response] {
                                if (const auto mess = response->read<SimpleStringMessage>())
                                    cache.insert(operation->method(), operation->service(), mess->testFieldString());
                             });
        // Deserialize the request
        SimpleStringMessage deserializedArg;
        if (!operation->serializer()->deserialize(&deserializedArg, operation->arg())) {
            qError() << "Deserialization of arg failed.";
            return;
        }
        std::optional<QString> cachedStr = cache.find(operation->method(), operation->service(), deserializedArg);
        if (cachedStr) {
            // Serialize cached response
            SimpleStringMessage val;
            val.setTestFieldString(cachedStr);
            const auto serializedValue = operation->serializer()->serialize<SimpleStringMessage>(&val);
            emit operation->dataReady(serializedValue);
            emit operation->finished();
            // Set server metadata cached field
            auto metadata = operation->serverMetadata();
            metadata.insert({ "cached", "true" });
            operation->setServerMetadata(metadata);
            return;
        }
        continuation(std::move(response), operation);
    }

    void interceptServerStream(std::shared_ptr<QGrpcChannelOperation> operation,
                                                  std::shared_ptr<QGrpcServerStream> stream,
                                                  QGrpcInterceptorContinuation<QGrpcServerStream> &continuation) override
    {
        // Intercept the response
        QObject::connect(stream.get(), &QGrpcServerStream::messageReceived, this,
                             [operation, stream] {
                                if (const auto mess = stream->read<SimpleStringMessage>())
                                    cache.insert_or_append(operation->method(), operation->service(), mess->testFieldString());
                             });
        QObject::connect(stream.get(), &QGrpcServerStream::finished, this,
                             [operation] {
                                cache.finalize(operation->method(), operation->service());
                             });
        // Deserialize the request
        SimpleStringMessage deserializedArg;
        if (!operation->serializer()->deserialize(&deserializedArg, operation->arg())) {
            qError() << "Deserialization of arg failed.";
            return;
        }
        std::optional<QString> cachedStr = cache.find(operation->method(), operation->service(), deserializedArg);
        if (cachedStr) {
            // Serialize cached response
            SimpleStringMessage val;
            val.setTestFieldString(cachedStr);
            const auto serializedValue = operation->serializer()->serialize<SimpleStringMessage>(&val);
            emit operation->dataReady(serializedValue);
            emit operation->finished();
            // Set server metadata cached field
            auto metadata = operation->serverMetadata();
            metadata.insert({ "cached", "true" });
            operation->setServerMetadata(metadata);
            return;
        }
        continuation(std::move(response), operation);
    }

    MyCacheStorage<QString> cache;
};

Both interceptCall() and interceptServerStream() methods in the CachingInterceptor class intercept Qt GRPC calls and streams, implementing a caching mechanism for responses. They both establish connections to handle incoming messages and attempt to deserialize request arguments. Both methods check for cached responses and, if found, serialize, and emit correct signals to set the response data. The methods, if the response was found in the cache, set the server metadata key cached to true.

The key difference lies in how they handle server streaming interactions and cache finalization: interceptCall() primarily relies on the response's finished signal for caching, while interceptServerStream() employs connections to both the server stream's messageReceived and finished signals for comprehensive handling of streaming interactions and cache finalization, this way, the interceptServerStream() returns cached response, only if the full stream was cached.

Registering the Caching Interceptor

Next, you'll need to register the Caching Interceptor with the QGrpcClientInterceptorManager. This ensures that it becomes part of the interceptor chain.

QGrpcClientInterceptorManager manager;
auto cachingInterceptor = std::make_shared<CachingInterceptor>();
manager.registerInterceptor(cachingInterceptor);

© 2024 The Qt Company Ltd. Documentation contributions included herein are the copyrights of their respective owners. The documentation provided herein is licensed under the terms of the GNU Free Documentation License version 1.3 as published by the Free Software Foundation. Qt and respective logos are trademarks of The Qt Company Ltd. in Finland and/or other countries worldwide. All other trademarks are property of their respective owners.