1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mavibot.btree.persisted;
21
22
23 import java.io.DataInputStream;
24 import java.io.DataOutputStream;
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.lang.reflect.Array;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.Iterator;
34 import java.util.NoSuchElementException;
35 import java.util.UUID;
36
37 import org.apache.directory.mavibot.btree.Tuple;
38 import org.apache.directory.mavibot.btree.util.TupleReaderWriter;
39
40
41
42
43
44
45
46 public class BulkDataSorter<K, V>
47 {
48 private File workDir;
49
50 private int splitAfter = 1000;
51
52 private Comparator<Tuple<K, V>> tupleComparator;
53
54 private TupleReaderWriter<K, V> readerWriter;
55
56 private boolean sorted;
57
58
59 public BulkDataSorter( TupleReaderWriter<K, V> readerWriter, Comparator<Tuple<K, V>> tupleComparator,
60 int splitAfter )
61 {
62 if ( splitAfter <= 0 )
63 {
64 throw new IllegalArgumentException( "Value of splitAfter parameter cannot be null" );
65 }
66
67 this.splitAfter = splitAfter;
68
69 this.workDir = new File( System.getProperty( "java.io.tmpdir" ), System.currentTimeMillis() + "-sort" );
70 workDir.mkdir();
71
72 this.readerWriter = readerWriter;
73 this.tupleComparator = tupleComparator;
74 }
75
76
77 public void sort( File dataFile ) throws IOException
78 {
79 int i = 0;
80
81 Tuple<K, V>[] arr = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, splitAfter );
82
83 Tuple<K, V> t = null;
84
85 DataInputStream in = new DataInputStream( new FileInputStream( dataFile ) );
86
87 while ( ( t = readerWriter.readUnsortedTuple( in ) ) != null )
88 {
89 arr[i++] = t;
90
91 if ( ( i % splitAfter ) == 0 )
92 {
93 i = 0;
94 Arrays.sort( arr, tupleComparator );
95
96 storeSortedData( arr );
97 }
98 }
99
100 if ( i != 0 )
101 {
102 Tuple<K, V>[] tmp = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, i );
103 System.arraycopy( arr, 0, tmp, 0, i );
104 Arrays.sort( tmp, tupleComparator );
105
106 storeSortedData( tmp );
107 }
108
109 sorted = true;
110 }
111
112
113 private void storeSortedData( Tuple<K, V>[] arr ) throws IOException
114 {
115 File tempFile = File.createTempFile( UUID.randomUUID().toString(), ".batch", workDir );
116 DataOutputStream out = new DataOutputStream( new FileOutputStream( tempFile ) );
117
118 for ( Tuple<K, V> t : arr )
119 {
120 readerWriter.storeSortedTuple( t, out );
121 }
122
123 out.flush();
124 out.close();
125 }
126
127
128 public File getWorkDir()
129 {
130 return workDir;
131 }
132
133
134 public Iterator<Tuple<K, V>> getMergeSortedTuples() throws IOException
135 {
136 if ( !sorted )
137 {
138 throw new IllegalStateException( "Data is not sorted" );
139 }
140
141 File[] batches = workDir.listFiles();
142
143 if ( batches.length == 0 )
144 {
145 return Collections.EMPTY_LIST.iterator();
146 }
147
148 final DataInputStream[] streams = new DataInputStream[batches.length];
149
150 for ( int i = 0; i < batches.length; i++ )
151 {
152 streams[i] = new DataInputStream( new FileInputStream( batches[i] ) );
153 }
154
155 Iterator<Tuple<K, V>> itr = new Iterator<Tuple<K, V>>()
156 {
157 private Tuple<K, V>[] heads = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, streams.length );
158
159 private Tuple<K, V> candidate = null;
160
161 private boolean closed;
162
163 private int candidatePos = -1;
164
165
166 @Override
167 public boolean hasNext()
168 {
169
170 if ( closed )
171 {
172 throw new IllegalStateException( "No elements to read" );
173 }
174
175 Tuple<K, V> available = null;
176
177 for ( int i = 0; i < streams.length; i++ )
178 {
179 if ( heads[i] == null )
180 {
181 heads[i] = readerWriter.readSortedTuple( streams[i] );
182 }
183
184 if ( available == null )
185 {
186 available = heads[i];
187 candidatePos = i;
188 }
189 else
190 {
191 if ( ( available != null ) && ( heads[i] != null ) )
192 {
193 int comp = tupleComparator.compare( heads[i], available );
194 if ( comp <= 0 )
195 {
196 available = heads[i];
197 candidatePos = i;
198 }
199 }
200 }
201 }
202
203 heads[candidatePos] = null;
204
205 if ( available == null )
206 {
207 for ( int i = 0; i < streams.length; i++ )
208 {
209 if ( heads[i] != null )
210 {
211 available = heads[i];
212 heads[i] = readerWriter.readUnsortedTuple( streams[i] );
213 break;
214 }
215 }
216 }
217
218 if ( available != null )
219 {
220 candidate = available;
221 return true;
222 }
223
224
225 for ( DataInputStream in : streams )
226 {
227 try
228 {
229 in.close();
230 }
231 catch ( Exception e )
232 {
233 e.printStackTrace();
234 }
235 }
236
237 closed = true;
238
239 return false;
240 }
241
242
243 @Override
244 public Tuple<K, V> next()
245 {
246 if ( candidate == null )
247 {
248 if ( !closed )
249 {
250 hasNext();
251 }
252 }
253
254 if ( candidate == null )
255 {
256 throw new NoSuchElementException( "No tuples found" );
257 }
258
259 return candidate;
260 }
261
262
263 @Override
264 public void remove()
265 {
266 throw new UnsupportedOperationException( "Not supported" );
267 }
268
269 };
270
271 return itr;
272 }
273 }