Wednesday, September 25, 2013

Concurrent Merge sorting of large number of different types of files in Java

I recently worked on a customer project, part of which required sorting and merging multiple large files based on a timestamp field in the files. These files were Call data records (CDR) files which are basically CSV files containing usage records for all subscribers of 2G, 3G and GPRS for each day. The total number of files was about 2000 with varied number of 2G, 3G and GPRS files. The total size was roughly 200 Gb.

Although this can be done using BigMemory, the customer wanted a quick and dirty way of doing this.

Each of the files had a particular naming convention that would help understand what type of file it is. The 1st 2 or 3 characters enable us to understand what type of file it is. Ex. File names –

3G
'GMI02A_13082900_0183'
'GMI02A_13082900_0184'
  
2G
'RTPAHLR1_13082700_8853'
'RTPAHLR1_13082701_8854'
'RTPAHLR1_13082702_8855'
'RTPAHLR1_13082703_8856'
'RTPAHLR1_13082704_8857'

GPRS
'MOU05_1308270005'
'MOU05_1308270020'
'MOU05_1308270035'

As you can see the characters before the first underscore identify what type of file it is. The characters after the underscore are timestamps. If we sort and group the files as per these timestamps it is likely that we will have the same range of timestamps in the file contents. An interesting thing to note is, there can be multiple files per timestamp, which is the character after the 2nd underscore for 3G and 2G.

Single sample line from each of the CSV was as follows (with dummy data) –

2G
5x944324461a1b18d08d7ab25d9fa1595f,'ABCDFE12_13050112_9998',' ','','4','43',' ',' ','','','466974301229529','886983095821','','',' ',' ','','','2013-05-01 11:10:58.000',307,' ','0','0',' ','','','                        ',' ',' ','','',' ',' ','',' ',' ',' ','','1','2','1',’6899066686','','','','1','2','1','6899066686',' ',' ','','','                ',' ',' ','','','      ','08-0-9-3    ','08-0-9-3    ','                                  ',' ABCDFE12   ','17','0','  ',' ',' ',0,'IGMRL01','2-15','ONXDEM','7-25','0','','','778158',0,0,'','',' ',,'','','',,,'','','','','','','',' ',0,'','','','','','','',' ',' ','','',' ','  ',' ','  ','8DCA28F284000007','1','1','1 ','656935***319','','','','','','',' ','  ','  ','2013-05-01 11:04:58.000',307,' ','',' ',181,'1','1','1 ','656935***319',' ','        ',' ',' ',' ','  ','  ','0','2','0','1','D9110935388202','','  ','','','',' ','',' ','  ','  ',,'','',' ',' ',' ',' ',' ',' ','  ','','','','','653095821','456683539688','6544066686','0823206559','','','','','','','654066686','34222206559','',''

GPRS
5x944324461a1f18d093bfb25d9fa1595f,'MCCDSC_1305011035','18','0','3','1339655','466974301229529','2013-05-01 10:03:37.000','355026050423934','886983095821','133.22.29.4','31.39.11.321,'11892013','INTERNET','CMSC099.M22dd66.GPRS','1','111.334.23.12','413427','239','58844','1','8 ','46697 ','0','1','2',0,0,'02','FF','00','00','00','FE','00','00','00','00','00','4A','00','02','02','93','96','83','FE','74','81','FF','FF','00','00','00','2013-05-01 09:56:37.000','2013-05-01 10:03:03.000',386,'0 ','0',0,0,'00','00','00','00','00','00','00','00','00','00','00','00',,,0,'0','','0','0','','','00','0',0,0,0,0,,,,,'5630934411','7331295359688','00','00','00','3','1','1','0','0','0','',' ','',' ','','',' ','01306800','Samsung','Galaxy','Handset','1','','','','','','0','','0','0'

3G
5x944324461a1f18d093ba1595f,'DGCCSD_13050112_2938',1,1,1,1,1,1,1,1,0,2013-05-01 12:55:08.000',0,'2013-05-01 12:48:04.000','11','00','413B','55','0AD8','2013-05-01 12:47:59.000','03','FF','  ','FFFF','','466974104424565','','','','','','','','FF','939644958               ','05','06','5430***660','05','06','65535',65535,,,'65535',,'FF',65535,,,'00','03','FFFF','67822059982110','123974700000830','','0000','0000','0000','0000','0000','0000','07','886983***686','05','05','8488',21111,466,97,'19546',886935***416,'05',21401,466,97,'FFFFFFFFFFFFFFFF',,'FF','FF','00000000','  ','3','8','2013-05-01 12:50:49.000','2013-05-01 12:48:08.000','F5B9','8130***660','05','06','00','    ','FFFF','FF',0,'1233804097','05','04',0,0,0,0,0,0,'886935***374',0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'  ','2013-05-01 12:47:59.000',,'',,,'00',0,0,0,0,0,0,0,0,0,'413B','55','0AD8','','FF','10','FF','00',0,0,'','  ','  ',,0,0,0,0,0,161,161.11,'01','0',0,'000000','08',,53,'1246',0,0,'FFFF','FFFFF','01','FF',436,62871450,'00','1','00','','  ','  ','FF','FF','FF','FF','FF','FF','FF',0,'2013-05-01 12:47:58.000',2,'00',,,,,,,,,'',,'      ','FFFF','FF','00','FF','FF','00','FF','2013-05-01 12:47:59.000','642***660',’1243408079','823***686','124191389688','121***660','066208079','','','',''

The idea was to read 3 files (of each type) at a time and insert into a shared sorted buffer. Once the reading is done, write out the sorted buffer into an output file, clear it and move onto the next batch of 3 files. Another requirement for the writer was that, all data should not be merge sorted into a single file, rather should be split into multiple output files. So the File writer thread must keep track of the number of lines written and roll over to the next output file.

The problem with this approach is that not all files will contain the same timestamp.
Ex. 2G might be till timestamp T5, 3G till timestamp T7 and GPRS till timestamp T4.
In this case we can only flush data till timestamp T4 and retain all the data after that in the buffer since the next batch of files might contain those timestamps to be sorted. This puts additional memory pressure.

The approach I used was to do a modified concurrent external merge sort.
  1. Based on producer consumer pattern
  2. 3 threads read from the 3 files in parallel (2G, 3G and GPRS) and insert into a shared task buffer
  3. 1 thread consumes from the buffer
  4. Reader threads read line by line and insert into a data structure, which internally sorts on timestamp via a custom comparator for each insert. It is basically a ConcurrentSkipListMap backed by several ConcurrentLinkedQueue. The key for the Map is the timestamp, and the value is the List of lines associated with that timestamp.
  5. After the each reader thread finishes inserting into the task queue, they wait on a CyclicBarrier. The last thread to reach the barrier notifies the Consumer that the file reading and sorting is completed into the queue
  6. Consumer is awakened and just spits out the sorted Map into a CSV
  7. Once file is written, CyclicBarrier is reset and cycle is repeated for next batch of files


Here is a snippet of the sorted map

A snippet of the reader (producer) thread looks like this

And a snippet of the writer (consumer) thread looks like this

A sample flow of the threads with CyclicBarrier coordination is as follows –


On a 4 core, 35Gb machine this with 120 Gb worth of data files, this took about 2 hours to complete. This solution worked for my use case, however, optimally you would read only a predefined number of lines into the buffer rather than the entire file to avoid heap pressure.
On a side note, I tested this with the CMS and G1 GC and I found CMS to be much more performant and predictable for my use case.
The entire code base is available here https://github.com/sourabhghose/LargeFileMergeSort