How to parse protobuf object in Kafka message by Trino

Hello!

I hope this is not the wrong section. If so, please move it :pray:

I run Trino in a Docker container using this command and passing configuration files:

sudo docker run -p 8080:8080  -v /mnt/c/trino-config-test/catalog/kafka.properties:/etc/trino/catalog/kafka.properties -v /mnt/c/trino-config-test/topic_description/kafka.mario.test.json:/etc/kafka/kafka.mario.test.json trinodb/trino

kafka.properties

connector.name=kafka
kafka.nodes=(hidden):9092
kafka.table-names=mario.test
kafka.hide-internal-columns=false

kafka.mario.test.json

{
    "tableName": "test",
    "schemaName": "mario",
    "topicName": "mario.test",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "dataFormat": "LONG",
                "type": "BIGINT",
                "hidden": "false"
            }
        ]
    }
}

I checked inside the container and files are copied successfully.

I successfully manage to connect Trino to Kafka, in fact I can run queries on the topic “mario.test” using the CLI.

The topic has the following structure:

  • The key is type of String but the actual value of this is an Integer, e.g. “12345”
  • The value is a Protobuf object

I have two problems:

  1. kafka.mario.test.json is not taken for whatever reason, in fact I have the column “kafka_key” is not created. It’s not relavant for the moment, but I wanted to mention this anyway.
  2. When I try to parse the Protobuf object, I get an error message.

This is the code for reference:

try (
            Connection conn = DriverManager.getConnection(url, properties);
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery("SELECT _key as key,  _partition_id as partition, _message as value FROM kafka.mario.test LIMIT 1");
        ) {
            log.info("Iterating on result set...");
            // Extract data from result set
            while (rs.next()) {
                log.info("{}", rs.getBigDecimal("partition"));
                log.info("{}", rs.getString("key"));
                final String valueString = rs.getString("value");
                final ScheduledJob value = ScheduledJob.parseFrom(valueString.getBytes(StandardCharsets.UTF_8)); // EXCEPTION THROWN
                log.info("{}", value);

            }
        } catch (SQLException e) {
            e.printStackTrace();
        }

The error message is:

Exception in thread "main" com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).

I can’t really find a way to deserialize that object properly and Trino doesn’t offer any support for Protobuf as far as I could read. I hope that someone is able to help me.