Over the last two days, I’ve highlighted how the Fuze backend data platform has evolved to meet growing customer needs. From communication data collection to business intelligence, we needed to address our system architecture to keep things running smoothly. When it came to reporting, we knew we were being thorough, but how could we ensure our reporting was up-to-date upon request?
Making our Reporting Real-time
There came a point when we realized that we could never meet the need for real-time reports with the ETL driven approach described above. We started a new project with the goal of achieving near real-time reports. By this time there was already some literature and examples out there on the topic of real-time data stream processing approaches to solve this kind of problem.
The first thing we did was change the data source from log files to events. The feature environment was modified so that in addition to log files, events were written to the network as they happened. This is critical as it avoids having to wait around for a job to run to collect the source data from log files. In our approach, emitted events are sent to Kafka. Many real-time data streaming architectures make use of Kafka, which serves as a queue for events that need to be processed. This is important as the rate at which events are generated may exceed at times the rate at which events can be processed. In the diagram above, generated events are sent to queues within Kafka on the left hand side. Then on the right side, Storm takes events from those queues and processes them. Unlike a batch system, this processing can happen as the events are coming in. The logic that in a batch-oriented system is in the ETL layer and is implemented within Storm in this scenario. In this version, the Storm jobs wrote data into a specific set of real-time tables in our reporting database. Reports hitting these database tables are now showing real-time data.
A Proliferation of Data Integration Pipelines
It’s worth noting that while we achieved our real-time reporting objective, we didn’t replace the existing batch oriented system. That system was still powering the majority of our reporting and invoicing functions. So what we really did was add a second data integration pipeline to our environment. Several problems started to become apparent as we lived in this dual pipeline scenario. A first practical problem is that data transformation and load logic needs to be duplicated in both the ETL branch and the real-time branch. So when we wanted to add a reporting feature, we would need to add logic to both the ETL and real-time sides. There was no obvious way to share the logic as one system was very SQL centric, and the other had logic implemented in java. On top of this our end users started pointing out other problems. We started getting bug reports coming in around data being different in the batch powered reports vs the real-time powered reports. When this happened we would have to start working our way backwards from the report back to the data source to figure out where the divergence was taking place. Sometimes the observed problem resulted from a bug in the data transformation logic in either the batch or real-time sides. Sometimes the source data in the log files and the emitted events was different, and had to be fixed in the feature platform, or accounted for in the transformation logic. A deeper question that this begged was, in face of discrepancies, what is the system of record?
Towards a Lambda Architecture?
One possible solution to the problems with the dual stream scenario described above is the so-called lambda architecture. There are many high profile users of this approach including Twitter, where Storm was developed. In this approach you consolidate all of the source data to a single stream. For us this means moving entirely to event-based data. This gets rid of the problem where data differences between the batch-derived and real-time derived data differ due to source data differences. Everyone is working off the same source data. This single stream of source data is fed into a Kafka queue. Data is processed out of the queue by two handlers, sometimes referred to as the batch layer and the speed layer. The batch layer is analogous to the ETL stream from the examples above. Many people are using HDFS and MapReduce based approaches as the handlers for the batch stream. The result data is often written to HDFS.
The speed layer is similar to the real-time approach described earlier. Storm jobs consume data off the Kafka queue as the data comes in. The data is processed as it arrives and is also written to HDFS. The key point here is that both the batch and real-time handlers are writing data to the same HDFS target system. The idea is that you create a single system of record. In the reporting layer, you always show the batch derived data whenever possible, and only show the real-time data when batch data isn’t available. When new batch derived data shows up, it replaces real-time derived data in terms of authority.
The result here is that you have a single system of record that has near real-time data in it, which is a real accomplishment. But the problem is that you still have two different sets of transform logic that both need to be maintained. This is a major drawback, especially given our experience troubleshooting our existing dual-stream implementation. Further, we were starting to question if there could actually be a single data store that could service most of our query needs. It was becoming increasingly clear that we would need to have multiple data stores optimized for different use cases. So while we spent significant time considering this approach, in the end we decided on a different more real-time centric approach.
Towards a Real-time Architecture
The architecture we settled upon attempts to consolidate the batch and speed layers of the lambda architecture to a single real-time stream. You still have events generated from the feature environment that are sent to a Kafka queue. But from this point there is a divergence from the previously proposed lambda architecture. Raw events are persisted into HDFS. These will be needed if we find problems with our data processing logic and need to replay them. The processing of the raw events in Kafka happens with Flink (similar to Storm). As raw events are processed, they are written back to Kafka creating a new queue of processed events where the events have been decorated with data lookups from other systems. Now different data consumers subscribe to the processed event streams. These processed events are also persisted into HDFS in case we need to replay them at a later date, for example if we need to populate a new data store with historical data beyond what is contained in the Kafka queue.
The benefits of this approach are that it avoids the duplication of data transformation logic that is present in the lambda architecture. One set of logic is by definition much easier to maintain than two. It also is well suited to the reality of having multiple consuming data stores that are each optimized for particular purposes. We can continue to use a columnar data warehouse for aggregations, analysis, and invoicing. We can use HBase for specific query scenarios related to calls and messages. We can use ElasticSearch for full text search of messages. We expect the number of use case optimized data stores to continue to grow over time. Another important point in this approach is that the consumers are all pulling data from Kafka rather than having data pushed to them. This is subtle point, but in our experience we prefer pull-based architectures to push-based ones. In the event of failure or downtime in the downstream reporting systems, the systems will catch up once they come back online vs someone needing to figure out how to go back and reload data from the time the target system was down.
So what are the downsides to this approach?
One thing which becomes more difficult is the replay and repair of data. There are invariably going to be data problems that are uncovered over time. Some of these problems may be in upstream data producing systems. The problems may also come out of bugs in the data processing and transformation layer. Either way, the question will become, what do you do after you have fixed the problem? How do you repair the data in the downstream reporting systems? Or do you bother to do this at all? Here I would say the batch oriented approach targeting a system of record database has some advantages. You can re-run the batch job to re-populate / overwrite existing data. This problem is why we persist all the raw and processed events in HDFS so we can perform repair operations after the fact.
The other downside is that you end up having many copies of your data in all these different data stores. We accept this as the price of being able to bring the right query engine to bear for particular purposes. The idea that a single database can meet 80% of the query needs just isn’t the case anymore. If anything, things seem to be headed in the other direction and there will be more divergence before, any kind of convergence happens.
There is not a one-size-fits-all approach when defining a data architecture and strategy. A lot of the architecture and technology choices will heavily depend on the data you are working with, and what your query requirements look like. But some principles do stand out from our experience that are worth considering. There is significant simplification in creating a single data integration pipeline that data consumers can subscribe to vs maintaining multiple data integration pipelines. There is also a lot of benefit if this single data integration pipeline is real-time vs batch oriented. It is our belief that there will be a need for multiple data stores with optimized query engines for the foreseeable future. We tend to think of these different systems as indices into our data. So the data integration approach you choose should support this reality.
If you are interested in reading more on these topics I want to point you to a couple additional resources. In our evolving thinking about data we have been heavily influenced by approaches taken at LinkedIn. In particular Jay Kreps, the original author of Kafka, has written a blog post that is required reading here at Fuze.
If you are interested in a lambda architecture at scale, check out this blog post from Twitter.
And of course, stop by and visit me today if you’ll be attending Internet of Things Expo! Drop by my presentation to keep this conversation going.