Project Longbow
Written by Lubomír Slivka |
This is part of a series about FlexQuery and the Longbow engine powering it.
When we set out to refresh the foundation for GoodData’s analytics stack, we discovered the Apache Arrow project and realized that its various components could both guide and advance our efforts.
Combined with other open source technologies that efficiently integrate with Arrow, we were confident that we could build a strong and flexible layer of data services that will solve analytics use cases for our clients.
Very early, we saw that we should not just take ‘a bunch of technologies’, glue them together, make it work and be done with it. While this definitely works for many types of projects, we are in for a longer haul and wanted to take a more systematic approach when building our foundation. So, the Longbow project was born and today forms the backbone of GoodData’s FlexQuery.
In this article, I will explain the ideas and architecture behind Longbow and how it uses the Apache Arrow and Arrow Flight RPC. If you’d like to learn more about FlexQuery, check the Building a Modern Data Service Layer with Apache Arrow.
Motivation
With Project Longbow, we set out to create a framework for building data services powered by Apache Arrow and Flight RPC.
One of our primary goals was to create a system that allows its clients to easily (using request payload) perform different tasks that leverage capabilities of one or more data services: to compose tasks out of smaller, elementary operations done by the different data services.
The motivation behind this goal was very pragmatic: product requirements always change and in our domain, that typically means changing what we do with the data. In the ideal state, we want to be in a position where the requirements can be either addressed by using existing data services in different ways (e.g., changing the request payload) or by plugging in a new data service and using it in conjunction with the other existing services.
We realized that the diligent application of principles behind the Arrow Flight RPC is one of the keys to building a flexible and cohesive layer of data services.
Thus, one of the key parts of Project Longbow is a prescriptive, repeatable approach for building and operating Flight RPC data services that can 'play together.' On top of this, we built a set of ‘core’ services - data source connectors, ad-hoc SQL Query service, data frame processing service, and finally, cache and pre-aggregation store.
In this article, I’ll go more in-depth into the architecture of Longbow and how it uses and leverages Arrow and Flight RPC. But before diving into it, I put together an article as an intro into the Flight RPC land to explain fundamentals, in case you are unfamiliar.
Longbow Introduction
One of the goals of Longbow is to allow a prescriptive, repeatable method for building and operating Flight RPC data services that can ‘play together’ as they handle different types of requests for analytical processing.
On top of this, Longbow delivers a set of ‘core’ services that are typically involved in analytical processing:
- Connector Service to talk to different types of data sources.
- SQL Query Service that allows to run SQL on top of arbitrary Arrow data.
- Dataframe Service that allows to manipulate or enrich arbitrary Arrow data via Pandas dataframes.
- Cache and Pre-aggregation storage service for arbitrary Arrow data.
Just with these four basic services, Longbow can address several different use cases in analytics processing - and all that by just differently composing the request:
Compute the result by querying warehouse cache results for repeated reads.
Compute intermediate result by querying warehouse, then enrich the result by applying machine learning using dataframe operations, then cache result for repeated reads.
- It’s just a matter of tweaking the request payload to make Longbow also cache the intermediate result
Read raw Arrow data from a non-SQL data source (say CSV on filesystem, Excel, arbitrary APIs), apply SQL query on top of it, then cache result for repeated reads
- Again, it’s just a matter of payload composition to make Longbow also cache the raw data or to pass the SQL Query result to be post-processed using dataframe operations
Compute sub-results from multiple different data sources, feed the results into SQL Query service, prepare a single result, then enrich that result using ML algorithms applied in the Dataframe service, and finally cache the result
Furthermore, if we find that we are missing some type of service, Longbow makes it easy to create one - the framework allows us to focus on the service logic itself and once done we add it to the existing ensemble.
Having a foundation with this level of flexibility transcends to the rest of the analytics stack and in the end into the product, its features, and ultimately to the end users.
So let us now dive into how Longbow is architected and built.
Architecture
We have designed Longbow as a modular monolith - check out this nice article if you want to learn more about this type of architecture.
At the core of Longbow’s architecture is a modular Flight RPC server that can run any number of Longbow modules. Each module then implements one or more different data services.
The server is designed in a way where it can run either standalone or as a part of a distributed system - a Longbow Cluster - in which different nodes work on top of a shared state. The server takes upon a lot of ‘boring’ responsibilities such as:
- Connection to the Longbow Cluster
- Data services registration
- Routing and dispatching Flight RPC calls to data services
- Connection to secure credential store (HashiCorp Vault)
- Health-checking infrastructure
- Common logging infrastructure
- Exposing monitoring metrics
- Regular maintenance tasks
Thanks to the modular monolithic architecture, we can support different types and scales of deployments. We can deploy data services in a microservices model where each Longbow module runs standalone in multiple replicas or we can deploy a single server that runs all the data services - and several variants in between.
For example, when we were recently rolling out new CSV analytics features for which we built two new data services, we deployed those new data services together in two replicas - while the rest of the production cluster ran in the microservices model.
Our main motivation behind going with modular monolith was to have deployment flexibility: going from smaller ‘condensed’ deployments to all-out microservices is essential for us and our business.
Longbow Cluster
Running Longbow as a distributed system is a necessity for anything beyond small deployments; a Longbow Cluster consists of multiple replicas of different types of nodes.
These nodes share and work with the following state:
- Cluster wide configuration.
- Cluster node and data service registry.
- Catalog of Flights (identified by flight paths) available in the cluster.
- The catalog contains only the essential metadata about the flight.
Indeed - the Flight RPC’s concept of flights identified by flight paths is a first class citizen in Longbow. Longbow Cluster provides all the essential functionality to work with and manage Flights identified by flight path.
The flights identified by path are typically used for data that is materialized once and then read many times by different data services and/or Longbow clients. These are typically different types of caches or pre-aggregations and are heavily used for different scenarios in the analytical processing.
There is way more we could write about how Flights identified by path are end-to-end managed in Longbow; it is quite a juicy topic so we will save that for another article later on.
General Principles
To keep things organized and under control, we have established several principles for the Longbow cluster and the nodes that are part of it:
- All changes to the cluster state must be always communicated using events.
- Any node in the cluster can equally well answer requests about cluster state.
- All coordination between nodes is done on top of the cluster state and/or events that describe its change.
- Data services running on the nodes can still talk to each other using Flight RPC in order to invoke other services, read and write data.
- Nodes do not forward data on behalf of the client - when the client wants to invoke service, write, or read data, it must go to the correct node.
These principles have several implications; in the context of Flight RPC and data services the main implications and benefits are:
- Any node in the cluster can answer the
GetFlightInfo
request for Flights described by the path. - Any node in the cluster can tell the client (via custom Flight RPC action) where a particular data service runs so that the client can invoke the
GetFlightInfo
request for Flights described by the command on the correct node. - Any node in the cluster can tell the client which node to contact to perform
DoPut
and write a new Flight described by the path.
In the first two cases, the FlightInfo returned by GetFlightInfo
can ‘guide’ the client to nodes where it should pick up the data.
There are additional ‘boring’ implications that both nodes and clients have to handle in this setup with two-step dispatch (e.g. GetFlightInfo or custom action is used to obtain nodes to contact). Most notable is that nodes can reject ‘misplaced’ requests and clients must be ready for that and re-drive the flow - this can happen as cluster changes and new nodes come or existing nodes go away.
Abstracting the cluster
At design time, we made a conscious decision and effort to keep details of clustering separate from the Longbow modules and data services. The implementation of clustering is encapsulated and hidden behind domain-specific interfaces carefully tailored for the Longbow requirements.
The Longbow modules and data services only ever use these interfaces and so in the end they do not care how clustering is realized - as long as the contracts of the clustering interfaces are met, everything ‘just works’.
While we do not anticipate we will be changing the clustering implementation too often, this design plays well into the modular monolith:
- Simplest deployments where we run Longbow services all-in-one on one server use a specialized implementation of ‘clustering’ that works purely in-memory with the option to keep a persistent state in an SQLite database.
- Clustered deployments with high availability requirements run multiple Longbow servers on top of etcd (more on this later).
While having this flexibility is important for production deployments, it is also extremely useful for various development use cases. Spinning up an all-in-one Longbow server running all data services is useful for straightforward automated end-to-end testing and testing on developer workstations - and we leverage this heavily.
Clustering made easier with etcd
Distributed systems are not easy so we approached the Longbow clustering with great respect and did several iterations when coming up with the real clustering implementation.
In the end, we implemented the Longbow cluster on top of etcd. The etcd has a very solid track record (being used in Kubernetes and its many large installations) and importantly its data model and features are a perfect match for what we wanted to accomplish.
The etcd has all the necessary features to build a solid and sound coordination on state that has to be shared within a distributed system. I’m not going to dive too deep into etcd at this point as that is a topic worth its own article. Let me just highlight few key features that we use heavily:
- MVCC model with key versioning
- Concept of leases which can be used to bind key lifecycles
- Watching for key changes -> eventing
- Support for transactions (modifying multiple keys based on some conditions)
With etcd and its features, we built a cluster implementation according to the Command-Query Responsibility Segregation pattern:
- All updates of cluster state (such as writing new flight path) are done using etcd transactions that move the state from one consistent state to another
- These updates result in events being produced by etcd
- Based on events from etcd, each Longbow node builds and maintains its own view of cluster state - most importantly the catalog of available flight paths
- All read requests - such as GetFlightInfo are served from local views, without additional round-trip to etcd
Longbow uses this pattern for performance reasons; the lookups for flight paths are very frequent:
- Every attempt to read an existing flight path (typically containing a cached result or intermediate result) using Flight RPC’s GetFlightInfo -> DoGet flow needs to perform a lookup.
- As Longbow’s services work together and often coordinate and exchange materialized data via Flights described by its path, they have to do catalog lookups internally.
We measured in our production systems that flight catalog lookups (done for many different and valid reasons) happen almost 10x more often than actual reads of data. The CQRS pattern reduces the duration of these calls drastically.
Now, the tricky part about CQRS is that it makes the read part of the system eventually consistent - this complicates life of the clients who want to operate in mode where a Flight they store under a path can be read back reliably. To this end, Longbow comes with low-overhead mechanisms to support read-after-write consistency.
We will describe this and other interesting things related to etcd usage in one of the following posts.
Flight RPC in Longbow
One of the things we wanted to simplify was the creation of production-ready Flight RPC data services that can ‘play together’ in order to perform different types of analytical processing.
The modular monolith architecture and Longbow cluster I outlined in previous sections establish the ‘playground’ and some fundamental rules. But there are additional facets that we had to address. I’ll go over those briefly in the next sections.
Modular Flight RPC Server
We have created a single, configurable, and production-ready implementation of the Flight RPC server that is used to run Longbow nodes. Using configuration, administrators can influence what modules - and thus Flight RPC data services - run on that node.
The Longbow server builds on top of Arrow’s Flight RPC infrastructure and solves several ‘boring’ tasks:
Controlled startup, establishing and maintaining connection to the Longbow Cluster
Graceful or abnormal shutdown
Handling the lifecycle of the Longbow modules
- Dynamic loading, module initialization, its startup and shutdown
- Registering services implemented in modules to the cluster
Running the actual Flight RPC server itself and routing calls to the appropriate modules & data services
Applying back pressure to protect the server from overload
Accessing values of secrets (stored either in environment variables, secrets files or HashiCorp Vault)
Facilitating and exposing health check results
Structural logging infrastructure and monitoring infrastructure
Implementation of common Flight RPC operations that should be supported on all nodes
With all this taken care of once, we nowadays only care about writing new Longbow modules that realize new services. All the boilerplate related to running the service in production is taken care of.
Flight RPC extensions
Longbow leverages the openness of Flight RPC and declares its own set of extendable payloads on top of it.
When it comes to the Flight Descriptors and Tickets, there are a few well-defined things:
Command Envelope that should be used for all calls where FlightDescriptor contains a command.
- This envelope contains the information necessary for routing the command to the desired data service running somewhere in the Longbow cluster.
- Similar to the Flight RPC descriptor itself, the actual contents of the envelope are opaque and can hold any payload.
Ticket types that should be returned in FlightInfo
- Again, the ticket types prescribe the essential information needed for routing of the call to a particular data service running on a Longbow node
Extended error information structure:
- Longbow comes with a set of its own, more fine-grained set of error codes that also allow clients to programmatically decide how to perform retries of failed calls
Then there are also several custom Flight RPC actions; the two most notable are:
GetLocations action that can be used by clients to determine nodes where GetFlightInfo for a particular descriptor should be done
- For descriptors that contain a command, this returns a list of locations where the requested service runs
- For descriptors that contain a Flight path, this returns a list of locations where data for that path may be written using DoPut. Bear in mind that thanks to general principles on the Longbow cluster, the GetFlightInfo for a path can be called on any node and will guide the client to a location where the existing data can be picked up
HealthCheck action which exposes the health status
We also had to extend the contract of Flight RPC GetFlightInfo to accommodate long-running requests.
The Flight RPC’s contract for GetFlightInfo is that when a command is included, the payload should be used to generate Flight data - in other words, it is a service invocation. Oftentimes, the service may involve some sort of queuing, and on top of that, once the work actually runs, it may take a while. In such situations, the clients typically want to poll for the completion or just cancel the long-running work.
At the time, Flight RPC did not explicitly cover long-running work - which was recently addressed by introducing the PollInfo. We had to do without it and instead have a contract where GetFlightInfo calls for long-running tasks that may end with a FlightTimedOut error and include retry information in the extra error detail structures.
Service Payload templates
On top of common envelopes used in Flight descriptors and common ticket types, Longbow comes with templates for service payloads. They may or may not be used by a service. Longbow’s core services stick to them because these templates prescribe how a particular service can play with others.
A template for service payload that takes multiple input Flights, does something with the data and produces results always consists of the following sections:
List of inputs
- Each input has a name unique in the scope of the payload.
- Each input specifies either FlightDescriptor to be used to obtain the data or, for convenience, a flight path.
- And here is the fun part: the FlightDescriptor can again contain a command that asks for another service invocation, a service that has a payload derived from the same template.
Sink definition
- This tells the service what to do with the result.
- The result may be kept locally and consumed once, directly on the node that produced the result.
- Or the result data may be sunk to a particular flight path from where it can be read repeatedly. The client may indicate what to do if that sink flight path already exists - whether the existence should be treated as an error or whether it is ok to just use the existing result and skip any processing.
Result metadata modifications
- Very often, the client wants to ensure that the produced Arrow Data includes some metadata either in the Arrow Schema or for fields in the schema.
- Through the result metadata modifications, the clients can influence the metadata included in the result.
Services that only produce data (e.g. typically connectors to data sources) have similar templates - they just omit the list inputs.
Naturally, each service adds its own specific parts on top of these templates. A service that runs SQL on arbitrary Arrow data contains the SQL itself and the parameters. A service that runs Dataframe operations using pandas contains information about what operations to run and the parameters for each operation.
There are two things I would like to pin-point:
- The recursive nature of the payload: the inputs can contain FlightDescriptors that describe calls to other data services. This in the end allows the client to compose a complex request that is satisfied by calling the other data services.
- The sink-to-flight path and result reuse essentially allow for transparent caching. When a service finds that a flight path mentioned in the sink already exists, it will immediately short-circuit (cache-hit) and return FlightInfo guiding the caller to the existing flight path.
Module foundation
Finally, Longbow brings a set of foundational components to the table, that the modules can use to build data services.
As we designed and built the different core services for Longbow, we kept running into the same concerns over and over: long-running tasks and their queuing, gathering input from the rest of the Longbow cluster, and writing results out to flight paths.
These repeating concerns made us converge towards service payload templates and then a set of reusable code components that address those concerns.
So Longbow now also contains components that greatly simplify the creation of data services that generate new flights using long-running tasks - so far the majority of our services are like this because in the end all of the services need some sort of queuing to protect themselves from overload.
So now, when we want to create a new data service that generates data using long-running tasks, we reuse the existing infrastructure that deals with everything except the task itself. The infrastructure takes care of queuing and all the Flight RPC request handling - all we have to do is to implement the service code in the task itself.
We also have reusable components that address the intra-cluster Flight IO (getting Flights, writing Flight paths) so that these concerns are always tackled the same way across data services.
With all this in place, our engineers who build features can (mostly) forget about boilerplate and focus on building the data services themselves.
Closing words
So that’s all for starters about the Longbow Project. We had a lot of fun building this technology and today we have been using it in production for several months with no major incidents. Of course, nothing is perfect or works flawlessly the first time around.
There were some bugs in our code, and there were one or two bugs in Apache Arrow as well - but so far we are very happy with our choices and leverage Longbow and Arrow and other open-source technologies around it more and more.
I hope you found this article interesting and informative. If nothing else, I hope you found useful information about Flight RPC and how we use it in practice.
Want to learn more?
As we mentioned in the introduction, this is part of a series of articles, where we take you on a journey of how we built our new analytics stack on top of Apache Arrow and what we learned about it in the process.
Other parts of the series are about the Building the Modern Data Service Layer, Flight RPC 101, details about the flexible storage and caching, and last but not least, how good the DuckDB quacks with Apache Arrow!
As you can see in the article, we are opening our platform to an external audience. We not only use (and contribute to) state-of-the-art open-source projects, but we also want to allow external developers to deploy their services into our platform. Ultimately we are thinking about open-sourcing the Longbow. Would you be interested in it? Let us know, your opinion matters!
If you’d like to discuss our analytics stack (or anything else), feel free to join our Slack community!
Want to see how well it all works in practice, you can try the GoodData free trial! Or if you’d like to try our new experimental features enabled by this new approach (AI, Machine Learning and much more), feel free to sign up for our Labs Environment.
Written by Lubomír Slivka |