Arrow Flight RPC 101

Written by Lubomír Slivka  | 

Share
Arrow Flight RPC 101

This is part of a series about FlexQuery and the Longbow engine powering it.

In this article I will walk you through the Flight RPC, which is a fundamental part of our Longbow engine, which I describe in the Project Longbow article.

Flight RPC is an API tailored for data services. It can be used to implement different services - the usual suspects: producers, consumers, transformers, and everything in between. It is built on gRPC and comes with ready-made and performance-optimized infrastructure - you do not have to care about the technicalities of streaming data in or out of the services.

Now, even if the Flight RPC specification is short, it took us some time to grasp and apply it - not because it is complicated or overly complex, but because we wanted to apply it correctly. In the following sections, I will try to explain some key Flight RPC concepts in layman’s terms and provide extra information on top of what is in the official documentation.

The Flight abstraction

The Flight RPC uses the ‘Flight’ abstraction to represent 'some data'. Each flight has a Flight Descriptor - which essentially tells either 'what' data to get or 'how' to get the data. Flight RPC comes with two subtypes of flight descriptors: path descriptor (what) and command descriptor (how).

Paths

Path descriptors specify the flight - the data - via its “flight path.” You can view this as a path-like identifier of the data. That is, the flight path does not necessarily have to be some kind of opaque identifier - it is something that the service can parse and alter its processing accordingly. Flight RPC does not put any constraints on what should or should not be in the flight path - it is completely up to the implementation to decide.

For example, you can have flight paths that look like 'trainingData/' and your service would interpret this as “this is a piece of training data and it has some unique identifier”. The service can treat this as semantic information and behave accordingly.

Or another example, you can have flight paths that look like 'my_user1/trainingData/' and your service would interpret this as: This data belongs to my_user1, it is training data, and it has this unique identifier.

The flights described by a flight path can be used to work with materialized data, and the paths can carry semantic information.

Commands

Command descriptors specify the flight - e.g., the data - using an arbitrary payload that a data service can understand and based on which it can “somehow” produce (or, in the parlance of Flight RPC, “generate”) or consume the data.

The Flight RPC does not care how the command looks or what it contains. From Flight RPC’s perspective, the command is a byte string - it is up to your services to understand and deal with it. The command may be anything from a simple string saying “do it” or a complex JSON or Protobuf message serialized into bytes.

For example, you may have a service that can run an SQL SELECT on some data source. You can design the payload for that service as a JSON containing the data source's URL, SQL statement text, and SQL parameters. Your data service receives a request to get the flight described by this payload. The code parses & validates the input and then proceeds with running the SQL.

You can view commands as payloads used to invoke your custom data services.

Reading data

With Flight RPC, clients should get the Flight data by first calling the GetFlightInfo and then using the returned FlightInfo to actually read the data using a DoGet call.

Here is where things get interesting. Clients call the GetFlightInfo and provide the flight descriptor - so this contains either path or command:

  • For flight paths, the server typically returns details where to access the materialized data
  • For commands, the GetFlightInfo call is actually the service invocation - this when where the service should perform all the work necessary to produce the data

In the end, the FlightInfo contains the following information:

  • Endpoints (or partitions), that make up the flight data.
  • Locations within each endpoint, where replicas are stored.
  • A ticket for each endpoint the client must use to read the data from the available locations.
  • Arrow schema describing the data. (optional)
  • Data size. (optional)

The endpoints and locations are quite straightforward: they describe data partitions and for each partition, there is a list of replicas.

But what is the ticket? From the Flight RPC perspective, it is an opaque byte string that needs to be presented at the location to actually read the data. So similarly to the commands, your services can put just about anything in there - as long as the content allows the server to stream the right piece of data.

Now that client code has the FlightInfo, it can proceed to the right locations to get data for the different endpoints by making a DoGet call - either serially or in parallel, this really depends on the client code.

The DoGet will open a stream of Arrow data. It is important to note that the stream includes the schema in every batch of data - so even if the initial GetFlightInfo call for whatever reason does not return a schema, the client will know the shape of the data at the time it gets the data.

While the Arrow schema is optional, many of the Flight RPC implementations require that it is always included in the FlightInfo. We found that in some services it can be really hard to produce schema at the time of GetFlightInfo accurately and so when the implementation requires the schema, our code sends an empty schema with a metadata marker.

Benefits of a cohesive system

The layer of indirection between GetFlightInfo and the DoGet is very valuable especially when the system has multiple cooperating data services.

It can be useful for example to implement gateways or transparent caching. Imagine two services:

A 'query*'* service to query data from a database and a 'cache' service that can store materialized data under particular flight paths.

This would then work out in this order:

  • The 'query' data service accepts GetFlightInfo for a command
  • The 'query' checks whether a flight path with the cached result already exists.
    • If it exists: the 'query' returns FlightInfo that navigates the client to read the materialized data from the 'cache' service
    • If it does not exist, the 'cache' service runs the necessary query, serves the data directly and create the cache in the background.

Note that there are many reasons why the 'query' service would not find cached data. Naturally, there is the cache-miss scenario, but apart from that the 'query' service may be accessing a real-time data source where caching is undesirable or the caching may not be possible at all due to compliance requirements.

Either way, the client does not care. The client is interested in some data and does not care where it gets it from. A system with correctly designed GetFlightInfo, FlightInfo, and tickets allows this.

Shortcuts

The indirection of GetFlightInfo -> DoGet methods may be cumbersome or even unnecessary for some services - typically simple, standalone data services.

In those cases, it is possible to ‘bend’ the Flight RPC to simplify things - while still benefiting from the existing client and server infrastructure provided by the Apache Arrow project.

Let’s take for example a basic single-node service that just hosts some data and allows clients to read it in a single stream. For such a service, you can completely ignore the GetFlightInfo and only use DoGet. The ticket that clients have to pass to the DoGet can contain the payload necessary to identify the data to stream. The payload can be anything. It may be a simple identifier of the data or a structured payload.

Writing data

When clients want to write data to a service, they use the DoPut method.

The DoPut accepts FlightDescriptor and then opens a bi-directional stream between the server and the client. Through this stream, the client can send Arrow data to write and receive responses from the server.

With DoPut, you can use descriptors containing a flight path to write. The typical use case here is a service that caches or stores data that the client ‘somehow’ obtains and wants to access later.

Doing DoPut with a descriptor that contains a command can be used to implement more complex writes - for example, performing bulk writes of data into a data warehouse. In this case, the command payload would carry the statement to execute.

Complex usage

The basic use of DoPut is fairly simple and straightforward. However, on its own, it may not be sufficient to handle more complex use cases - take for instance parallel upload of multiple data partitions.

In such cases, your data services will have to implement additional “Custom Actions” that the client will use on top of the DoPut.

For example, your data service can have StartParallelUpload to initiate and FinishParallelUpload to finalize the parallel upload of a data set. Once you’d call StartParallelUpload, your clients would do as many parallel DoPut calls as necessary (to create the partitions or endpoints in the parlance of Flight RPC) and then after all partitions were uploaded, you’d call FinishParallelUpload to finalize the upload.

Custom Actions

More often than not, your data service will have some custom requirements that cannot be addressed by the existing Flight RPC methods. To accommodate for this, the Flight RPC allows you to 'plug in' new arbitrary actions.

You can use these for anything your services need. For example, you can use the custom actions during more complex data operations that involve multiple DoPut/DoGet calls, you can use them for administering the service, implementing health checks, or improving maintainability.

The infrastructure takes care of the transport concerns and your code can focus on the action logic itself - assigning the action names and optionally designing the action body and action result and how they should be serialized.

Similar to command descriptors or tickets, the action body and result structure and serialization are up to you. A typical choice is either to use JSON or Protocol Buffers.

However, it is also good to keep in mind that some Flight RPC types - such as FlightDescriptor - are also serializable and could be used for action body or result; this can be useful if your action is directly related to the flight entity itself.

An example from our analytics stack: We have a custom action that tells clients where to perform DoPut. The client calls the custom action with the same FlightDescriptor they would use for DoPut itself. The result of this custom action is a list of locations that the client should write to.

Offloading Compute

Apart from supporting data reads and writes, the Flight RPC also has the DoExchange operation which your services can offer to the clients so that they can offload computation.

The usage is pretty straightforward:

  • The client calls DoExchange with FlightDescriptor; this will typically contain a command with payload describing the compute.
  • The client streams data in.
  • The server performs the transformation.
  • The client reads the result.

This is all achieved using a single DoExchange call and a single bi-directional stream prepared by the Flight RPC infrastructure.

DoExchange for inter-process compute offloading

In our analytics stack, we do not have any data services that offer the DoExchange for clients. We have, however, found it very helpful in multi-process services that require inter-process communication.

One of our Python data services allows clients to generate new flights by performing manipulation using the Pandas dataframe library.

Running ‘pandas a service’ gets tricky for many reasons - a big one lies in Python itself: the Global Interpreter Lock (GIL). For many operations Pandas holds the GIL and does CPU-intensive work - effectively ‘taking time’ the server needs to do other work. On busy servers, this can lead to nasty things such as increased latencies, failing health checks, and/or failing liveness probes.

To solve this, we have designed our Pandas data service so that it spawns multiple worker processes. Each process runs its own Flight RPC server listening on a Unix socket. When the server receives a request to generate data, it will offload the computation to the worker process.

The server finds the input data, initiates DoExchange with the worker, streams the input data to the worker, and then waits for the results, which it then streams out.

Errors

Flight RPC and its infrastructure come with a predefined set of errors that the server may raise on different occasions - the infrastructure will take care of error propagation between the server and the client.

You will find the ‘usual’ set of exceptions such as Unauthenticated, Unauthorized, ServerError, InternalError, UnavailableError, and others.

What we have found while building a more complex system with Flight RPC is that on their own, these built-in errors are not enough to implement more robust error handling strategies.

Thankfully the error handling in Flight RPC is also extensible. While it is not possible to to plug in arbitrary error types, it is possible to attach additional, custom information to the existing errors.

Similar to commands or tickets, the errors can also contain a custom binary payload where your server can put whatever it wants - like a serialized Protocol Buffer message.

So for example in our case, all our services are contracted to raise Flight RPC errors with this custom binary payload attached. The payload is a protocol buffer message with an error code and additional error details.

The clients always look for this attached payload and will deserialize and perform error handling according to the error code included in the message. If there is no payload attached, the client can be certain that there is something really wrong on the server because errors without our custom payload can only ever be raised by the Flight RPC infrastructure itself before our server code is even involved.

Wrapping Up

I hope this article helped you learn a bit more about the Flight RPC and the various ways it can be used and extended.

From my almost two year experience of working and designing against Flight RPC, I can wholeheartedly recommend you to use it if you are planning to build data services that work with data in Arrow format.

The Flight RPC, while somewhat opinionated, still gives you a lot of freedom to either bend or extend it to match your needs. Also, the opinionated parts are solid and are actually something you can start appreciating as you build more complex services or a set of services.

The big selling point is also the existing client-server infrastructure provided by the Apache Arrow project - you do not have to design and build your own and instead rely on the optimized infrastructure developed by the community.

Last but not least, you can use Apache Arrow in a dozen languages, from low-level, like Cpp and Rust to high-level, like Python and JavaScript.

Want to learn more?

As we mentioned in the introduction, this is part of a series of articles, where we take you on a journey of how we built our new analytics stack on top of Apache Arrow and what we learned about it in the process.

Other parts of the series are about the Building of the Modern Data Service Layer, Project Longbow, details about the flexible storage and caching, and last but not least, how good the DuckDB quacks with Apache Arrow!

As you can see in the article, we are opening our platform to an external audience. We not only use (and contribute to) state-of-the-art open-source projects, but we also want to allow external developers to deploy their services into our platform. Ultimately we are thinking about open-sourcing the Longbow. Would you be interested in it? Let us know, your opinion matters!

If you’d like to discuss our analytics stack (or anything else), feel free to join our Slack community!

Want to see how well it all works in practice, you can try the GoodData free trial! Or if you’d like to try our new experimental features enabled by this new approach (AI, Machine Learning and much more), feel free to sign up for our Labs Environment.

Written by Lubomír Slivka  | 

Share

Related content

Read more