Building data warehouse on Hadoop platform
Data warehouse on Big Data platform is a standard use case many organizations are exploring. The reason for this approach could be one of many flexibility big data platform offer.
- Big Data for enterprise wide analytics - Many organization are moving to a concept of having data hub or data lake where data is gathered and stocked based on source systems, rather than project based or subject area based data warehouse. Advantage of the approach is to remodel and design marts anytime based on the requirement.
- Cost - Hadoop has become cheap and alternate storage medium.
- Faster analytics - Big Data platform or Hadoop common moniker for big data can take care of traditional batch systems as well as near-real time decision support(2 seconds to 2 minutes is the time taken to make an action since data delivery) or near real time event processing(100 milli seconds to 2 seconds this the time taken to make an action since data delivery)
- Disparate data sources which traditional warehouse could not support
- High data volume - Huge volume of data which cannot be handled by RDBMS
- Flexibility for data exploration - Want to discover / explore on data which was previously underused or unused
This page shows an approach, how a traditional warehouse can be converted to a warehouse on BigData platform. Since Hadoop ecosystem is very wide and the range of technology available within ecosystem makes it difficult to choose which tool has to be used in use cases. But the other side of the coin is, this give a flexibility to try out tools based on use cases and other considerations like performance, operations and support etc.
This demo will make use of open source technology available on Hadoop stack like HDFS and Hive.
HDFS - is the core of Hadoop, this is a distributed file system which enables all the wonder Hadoop can do. There are many other distributed file system like Amazon S3, but when we talk about Hadoop HDFS is the moniker for file system. Most of the Hadoop marketing vendors like Cloudera, IBM BigInsights, Hortonworks etc use HDFS as their file system.
Hive - Hive is an apache project which introduced the concept of Databases,tables and SQL to big data world. Hive is a work in progress. Hive use HDFS for storage of data. More than opening up the flexibility of SQL to file system, Hive is used for its meta data store for HDFS. Meta data store is nothing but a database which stores information about files stored in HDFS, structure of files, partitions, buckets, serialization, de serialization needed to read and write files etc. This will enable Hive to query files based on the metadata stored in Hive meta store. To summarize Hive provides data abstraction and data discovery to files.
"How is the performance of Hive"
The performance of Hive is based on many factors. It includes the best practices normally followed in relational database world to have an optimized performance as well as Hadoop considerations like file storage, compression techniques etc.
To simplify, let us check what happens when a query is run on Hive.
- Query submitted to Hive through an UI
- Hive driver - works on a execute and fetch model
- Hive's compiler read meta store and get table details , structure, serialization , de serialization needed to read and write files etc. Compiler then create execution plan. Following the footsteps of established DBMS Hive use a cost based execution (rule based and correlation optimizer is also available). It generates a plan with multiple stages based on the cost for each operation.
- Hive's execution engine executes the plan created by compiler. It plans and managed dependencies between different stages. Each stage is nothing but a map and reduce operation of "MapReduce" of Hadoop.The output of individual stages are written to temporary files and used by subsequent stages. Output of final stage is written to the table's file location.
Looking at the above execution pattern,Hive tables can be designed in many ways to have optimized performance. But each approach has pros and cons. Considering the priority of requirements decision has to be made.
Data storage option
- File format - Hadoop support multiple file format option. Files can be stored as flat file, Hadoop supported sequence files, other complex but option rich formats like ORC, RC, Parquet etc. Hive supports all storage formats mentioned. But consideration also should be given to other access techniques to this data. If Hive is the only access option then any of the storage works. This will ensure faster query execution. But if other tools like ETL tools are used to access files it is not a guarantee that tool support all formats.
- Compression : compression is another way to optimize the query. Multiple compression codec like Gzip, snappy etc are available for Hive. Downside of GZIP is that files are not split - able. Compression codecs used here has different attributes, some compression work faster which need less CPU, while other split files during compression which need more CPU. split -ability of files on Hadoop is a significant factor when it comes to performance.
- How data is stored or organized - Organizing data in Hadoop is a discipline. The arrangement of data has a significant impact on querying. If the query is going to target a section of data we need to make sure query is hitting only that section not the entire data set available. Data can be sorted based on a transaction level, something like type of data or timestamp when event happened
Data hub Folder structure
The generic folder pattern to store data in data hub is based on source system.
/Source System/Type of data/Transaction Date
Data hub also can be made on a subject area basis rather than source system basis.
/credit system / risk data / Transaction date
This decision has to be taken on an enterprise level because synergy between programs and projects will be needed to achieve this on an enterprise scale.
Let us take a peek at how files and folders can be created based on source system.
e.g. We have file which has Movie information like movie_id, movie_name, year_of_release(2016), rating from rotten tomatoes and imDB files can be arranged this way.
Similarly if we have movie reviews from these vendors with information like movie_id, review_id, individual_rating, comment it can stored as
Now let us check a few possible scenarios how this data is consumed.
- Get movies with rating more than 3
- Get movie ids with maximum reviews
- Get count of movies released after 2014
These are scenarios which access only one file at a time, no need of joining between files.
Now let us look at some other scenario.
- Get the movie names and other information related with the movies with maximum review
This is an example where data has to be joined between files.
Understanding the joins and the queries will give a better idea how to store data. But to make it generic the above folder structure can be improved to
This folder pattern will help to access all movies and review related to year 2016. In this case data from 2016 can be access without reading other year data. In the above example technique we used is called partitioning data based on year. This kind of data arrangement helps not only Hive, but also other tools like impala, spark etc which use Hive's meta store for metadata. This folder arrangement can be compared to partitions on relational database because Hive knows where to find data rather than doing a full data scan.
Data hub on Hadoop avoid need of building an extra arcHive system which traditional data warehouse houses. In traditional DWH the source files once consumed by ETL are backed and go offline after a period of retention. Bringing the data back online is significant effort and most cases it goes unused over the time. The above discussed folder mechanism avoid the need of a separate arcHive mechanism. Since space is cheap on Hadoop compared to relational databases this option works out well.
Can we do relational data modeling on Hadoop?
Relational data modeling is a very common use case when Hadoop is chosen platform for enterprise data warehouse. The answer to the question is, it is technically possible to do it on Hadoop platform, with the help of Hive meta store. If we go one step back and look at the folder pattern designed, it is quite obvious the folders can be designed and arranged to follow relational data modeling on RDBMS.
Data access on HDFS is "schema on read". Irrespective of how and where data is stored, it can be read the way we want it to be read. Unlike traditional RDBMS where the schema has to be modeled and data is loaded into tables. This need extensive analysis on data and spending effort and time on optimized data model. With Hadoop this effort is significantly reduced, as the model change does not require data to be reloaded.
To explain this, let us look at sales file with data fields customer name, customer id, customer email , customer phone number , item bought, item quantity, item price.
In relational database this data can be normalized and split into customer information, product information and sales information and linked back with customer id and sales id and item id. This kind of model reduce redundancy, which is a major factor in RDBMS as the storage is costly. With right kind of indexes built the joined queries will yield better results.
- Customer ID
- Customer Name
- Items ID
- Item Description
- Item Name
- Item price
- Sales ID
- Customer ID
- Item ID
- Item Quantity
- Sales Date
Advantage of the Hadoop is that we can still have the same logical model and physically implemented in multiple ways without data movement.
|CustomerID|CustomerName| Email| Phone|ItemID| ItemDesc| ItemName|ItemsPrice|SalesId| SalesDate|ItemQty|
| 100| Johnemail@example.com|+1-99999999| 1000| Product One|Product_1| 9.99| S1000|2016-01-01 00:00:...| 2|
| 100| Johnfirstname.lastname@example.org|+1-99999999| 1001| Product Two|Product_2| 19.99| S1000|2016-01-01 00:00:...| 2|
| 100| Johnemail@example.com|+1-99999999| 1002|Product Three|Product_3| 29.99| S1000|2016-01-01 00:00:...| 2|
| 100| Johnfirstname.lastname@example.org|+1-99999999| 1003| Product Four|Product_4| 39.99| S1000|2016-01-01 00:00:...| 2|
| 200| Janeemail@example.com|+1-99998888| 1000| Product One|Product_1| 9.99| S1000|2016-01-02 00:00:...| 2|
| 200| Janefirstname.lastname@example.org|+1-99998888| 1001| Product Two|Product_2| 19.99| S1000|2016-01-02 00:00:...| 2|
| 200| Janeemail@example.com|+1-99998888| 1002|Product Three|Product_3| 29.99| S1000|2016-01-02 00:00:...| 2|
| 200| Janefirstname.lastname@example.org|+1-99998888| 1003| Product Four|Product_4| 39.99| S1000|2016-01-02 00:00:...| 2|
The above view shows a de normalized format of the data which is a convenient format to have on a presentation layer or a semantic layer where we need to run fact based analytical functions.
e.g. find total quantity of items sold on product one or total revenue on Product Two etc .etc.
In normalized modeled, data model this will require joining at least 3 tables. In Hadoop we know joining data is an expensive operation due to shuffle and multiple disk IO. In Hadoop platform consideration is given to performance than the storage, as storage is cheaper.
Method 1 - Normalized Model
Physical implementation of normalized logical model
Let us try to implement the normalized form with same physical implementation. In this method 3 tables are created
CREATE TABLE Customer_Info(customerId BIGINT, CustomerName STRING, Email STRING,Phone STRING)
COMMENT 'A bucketed copy of customer_info'
CLUSTERED BY(customerId) INTO 256 BUCKETS;
CREATE TABLE Items_Info(ItemID BIGINT, ItemDesc STRING, ItemName STRING,ItemsPrice DOUBLE)
COMMENT 'A bucketed copy of items_info'
CLUSTERED BY(ItemID) INTO 256 BUCKETS;
CREATE TABLE Sales_Fact(SalesId BIGINT,customerId BIGINT,ItemID BIGINT, ItemQty BIGINT)
COMMENT 'A bucketed copy of sales fact'
PARTITIONED BY(SalesDate STRING)
CLUSTERED BY(customerId,ItemID) INTO 256 BUCKETS;
Data is bucketed based on customerID,Item ID on tables. Bucketing is a technique where data is distributed based on a hash on distribution key. Picking up distribution key is crucial as we need to make sure data is getting distributed evenly across the data nodes. You can see that sales data is partitioned based on sales date, this will help in queries where data is accessed only from certain days or a range of days. e.g. where salesDate='01/31/2016'.
Bucketing data on same distribution key helps while joining. In the above model, most of the queries where join is needed joining predicate is going to be ItemId or customerID. Since data is distributed based on these fields, it will ensure the data joined on the keys will not need shuffling or moving over the network. This will significantly reduce the joining time. On Hadoop map reduce paradigm joining on map side will save time than joining on reducer side. Hive automatically detects the buckets and this optimization is applied.
256 in above example for the bucket size is a number, which has to be determined based on the data volume and nodes available. Normal bucket size has to be a few hdfs blocks size and which can be fit into memory. Having too many buckets with small bucket size is not recommended.
- Useful when a data is deterministic and data model works fine for the requirement
- Useful when queries used are static and predictive
- Easy migration from relational databases
- Cannot be used for dynamic queries or other analytical purpose
- Model change will need change in ETL and data reload will be needed
- Performance will become issue as the data grows and get skewed. Constant redistribution will be needed on the data if bucketed keys are not good enough.
When data is normalized and stored with a star schema, we will have to consider maintaining ACID to transactions too. Technically, we should be able to insert, update or delete data. As you know in HDFS in place modification of data is not easy. But with other techniques can be used to work around that
- Append data with a last_updated_ts instead of updating records
- Keep history data and latest data in two set of files.
- Use Hive to update data using How to use Hive for CRUD- Run updates and deletes on Hive
- Create a view on top of the history data to pull only latest data rather than materializing using the technique mentioned above
Method 2- De normalized data
The second method talks about flattening the data after adding all the possible attributes needed in a query. This can be compared to flat Hbase tables with column families. In this method we will have just one table for all our needs. The advantage of the de normalized data is that the code can read a bigger chunk of data rather than reading same data from multiple small chunks.
CREATE TABLE Sales_Fact_Flat(customerId BIGINT, CustomerName STRING, Email STRING,Phone STRING,
ItemID BIGINT, ItemDesc STRING, ItemName STRING,ItemsPrice DOUBLE,
SalesId BIGINT,ItemQty BIGINT)
COMMENT 'De normalized data'
PARTITIONED BY(SalesDate STRING)
Data is partitioned to make search /select for partial access. Logical column for such partition will be a timestamp or date. The only consideration when we choose partition is on file size. Partition information of file is stored in name node. Too many partitions on small files will be heavy on name node. And reading small files from each partition will be a not a good use of reader operation as well as memory. If the files are few HDFS block size partitioning works well. If files are too small, it can be merged together to have a reasonable partition. e.g. Daily files are a few kilo bytes. Many such files can be put together in a month partition or weekly partition based on data volume.
- Faster query execution
- Suitable for out of the box analytics
- Data redundant - You can see customer and item data is repeated across sales information. Storage is not a primary concern on Hadoop, if you decide to avoid redundancy the trade off is between space and performance.
- Not a good use case where data has to be updated constantly.
When to pick De normalization over method 1 (normalized data / star schema)?
- When joined tables are small and have a one to one match when joined.
- When the column width (number of columns) are less. If you have wide tables, it consume more space and block read size becomes large
- When data is not updated much. If you have to go back and update some random fields every time , de normalization technique will not work. For e.g. age , if age field is added part of transaction, we know it has to be modified every year.
Achieving same logical model on de normalized data
Most of the organization use haddop to complement the existing well established EDW. The models are build agnostic to the platform, but as we have seen Hadoop requires a different kind of physical implementation compared to RDBMS. Irrespective of that same logical model can be implemented in multiple ways
Splitting to Dimensions / sub tables
Above de normalized sales data can be split into dimensions and facts which we discussed already
To achieve this either,
Have a non materialized view built on top of Sales_Fact_Flat table Customer info can be recreated using a view
create view Customer_Info as select distinct customerId BIGINT, CustomerName STRING, Email STRING,Phone STRING from Sales_Fact_Flat
similarly other views can be recreated. The downside of views is that every time a view is used in query a map reduce run behind the scene to materialize the result.
Instead of views, materialize the data to tables using CTAS (create table as select)
create table Customer_Info as select distinct customerId BIGINT, CustomerName STRING, Email STRING,Phone STRING from Sales_Fact_Flat
Advantage of these methods are, these views/tables can be used as a independent entity for other analysis.
To summarize, blog talks about different viable options to implement data warehouse on Hadoop platform . We also talk about the various file and folder creation option based on access pattern, optimization and storage techniques.
Hope this article helps, please use comment section for further discussion.