2222import java .util .stream .Collectors ;
2323import lombok .NoArgsConstructor ;
2424import lombok .extern .slf4j .Slf4j ;
25- import org .rocksdb .BlockBasedTableConfig ;
26- import org .rocksdb .BloomFilter ;
2725import org .rocksdb .Checkpoint ;
28- import org .rocksdb .DirectComparator ;
2926import org .rocksdb .InfoLogLevel ;
3027import org .rocksdb .Logger ;
3128import org .rocksdb .Options ;
3229import org .rocksdb .ReadOptions ;
3330import org .rocksdb .RocksDB ;
3431import org .rocksdb .RocksDBException ;
3532import org .rocksdb .RocksIterator ;
36- import org .rocksdb .Statistics ;
3733import org .rocksdb .Status ;
3834import org .rocksdb .WriteBatch ;
3935import org .rocksdb .WriteOptions ;
5652public class RocksDbDataSourceImpl extends DbStat implements DbSourceInter <byte []>,
5753 Iterable <Map .Entry <byte [], byte []>>, Instance <RocksDbDataSourceImpl > {
5854
59- ReadOptions readOpts ;
6055 private String dataBaseName ;
6156 private RocksDB database ;
57+ private Options options ;
6258 private volatile boolean alive ;
6359 private String parentPath ;
6460 private ReadWriteLock resetDbLock = new ReentrantReadWriteLock ();
6561 private static final String KEY_ENGINE = "ENGINE" ;
6662 private static final String ROCKSDB = "ROCKSDB" ;
67- private DirectComparator comparator ;
6863 private static final org .slf4j .Logger rocksDbLogger = LoggerFactory .getLogger (ROCKSDB );
6964
70- public RocksDbDataSourceImpl (String parentPath , String name , RocksDbSettings settings ,
71- DirectComparator comparator ) {
65+ public RocksDbDataSourceImpl (String parentPath , String name , Options options ) {
7266 this .dataBaseName = name ;
7367 this .parentPath = parentPath ;
74- this .comparator = comparator ;
75- RocksDbSettings .setRocksDbSettings (settings );
76- initDB ();
77- }
78-
79- public RocksDbDataSourceImpl (String parentPath , String name , RocksDbSettings settings ) {
80- this .dataBaseName = name ;
81- this .parentPath = parentPath ;
82- RocksDbSettings .setRocksDbSettings (settings );
68+ this .options = options ;
8369 initDB ();
8470 }
8571
72+ @ VisibleForTesting
8673 public RocksDbDataSourceImpl (String parentPath , String name ) {
8774 this .parentPath = parentPath ;
8875 this .dataBaseName = name ;
76+ this .options = RocksDbSettings .getOptionsByDbName (name );
8977 }
9078
9179 public Path getDbPath () {
@@ -232,10 +220,6 @@ public void initDB() {
232220 "Cannot open LevelDB database '%s' with RocksDB engine."
233221 + " Set db.engine=LEVELDB or use RocksDB database. " , dataBaseName ));
234222 }
235- initDB (RocksDbSettings .getSettings ());
236- }
237-
238- public void initDB (RocksDbSettings settings ) {
239223 resetDbLock .writeLock ().lock ();
240224 try {
241225 if (isAlive ()) {
@@ -244,81 +228,40 @@ public void initDB(RocksDbSettings settings) {
244228 if (dataBaseName == null ) {
245229 throw new IllegalArgumentException ("No name set to the dbStore" );
246230 }
231+ options .setLogger (new Logger (options ) {
232+ @ Override
233+ protected void log (InfoLogLevel infoLogLevel , String logMsg ) {
234+ rocksDbLogger .info ("{} {}" , dataBaseName , logMsg );
235+ }
236+ });
247237
248- try ( Options options = new Options ()) {
249-
250- // most of these options are suggested by https://github.com/facebook/rocksdb/wiki/Set-Up-Options
238+ try {
239+ logger . debug ( "Opening database {}." , dataBaseName );
240+ final Path dbPath = getDbPath ();
251241
252- // general options
253- if (settings .isEnableStatistics ()) {
254- options .setStatistics (new Statistics ());
255- options .setStatsDumpPeriodSec (60 );
256- }
257- options .setCreateIfMissing (true );
258- options .setIncreaseParallelism (1 );
259- options .setLevelCompactionDynamicLevelBytes (true );
260- options .setMaxOpenFiles (settings .getMaxOpenFiles ());
261-
262- // general options supported user config
263- options .setNumLevels (settings .getLevelNumber ());
264- options .setMaxBytesForLevelMultiplier (settings .getMaxBytesForLevelMultiplier ());
265- options .setMaxBytesForLevelBase (settings .getMaxBytesForLevelBase ());
266- options .setMaxBackgroundCompactions (settings .getCompactThreads ());
267- options .setLevel0FileNumCompactionTrigger (settings .getLevel0FileNumCompactionTrigger ());
268- options .setTargetFileSizeMultiplier (settings .getTargetFileSizeMultiplier ());
269- options .setTargetFileSizeBase (settings .getTargetFileSizeBase ());
270- if (comparator != null ) {
271- options .setComparator (comparator );
242+ if (!Files .isSymbolicLink (dbPath .getParent ())) {
243+ Files .createDirectories (dbPath .getParent ());
272244 }
273- options .setLogger (new Logger (options ) {
274- @ Override
275- protected void log (InfoLogLevel infoLogLevel , String logMsg ) {
276- rocksDbLogger .info ("{} {}" , dataBaseName , logMsg );
277- }
278- });
279-
280- // table options
281- final BlockBasedTableConfig tableCfg ;
282- options .setTableFormatConfig (tableCfg = new BlockBasedTableConfig ());
283- tableCfg .setBlockSize (settings .getBlockSize ());
284- tableCfg .setBlockCache (RocksDbSettings .getCache ());
285- tableCfg .setCacheIndexAndFilterBlocks (true );
286- tableCfg .setPinL0FilterAndIndexBlocksInCache (true );
287- tableCfg .setFilter (new BloomFilter (10 , false ));
288-
289- // read options
290- readOpts = new ReadOptions ();
291- readOpts = readOpts .setPrefixSameAsStart (true )
292- .setVerifyChecksums (false );
293245
294246 try {
295- logger .debug ("Opening database {}." , dataBaseName );
296- final Path dbPath = getDbPath ();
297-
298- if (!Files .isSymbolicLink (dbPath .getParent ())) {
299- Files .createDirectories (dbPath .getParent ());
247+ database = RocksDB .open (options , dbPath .toString ());
248+ } catch (RocksDBException e ) {
249+ if (Objects .equals (e .getStatus ().getCode (), Status .Code .Corruption )) {
250+ logger .error ("Database {} corrupted, please delete database directory({}) "
251+ + "and restart." , dataBaseName , parentPath , e );
252+ } else {
253+ logger .error ("Open Database {} failed" , dataBaseName , e );
300254 }
301-
302- try {
303- database = RocksDB .open (options , dbPath .toString ());
304- } catch (RocksDBException e ) {
305- if (Objects .equals (e .getStatus ().getCode (), Status .Code .Corruption )) {
306- logger .error ("Database {} corrupted, please delete database directory({}) " +
307- "and restart." , dataBaseName , parentPath , e );
308- } else {
309- logger .error ("Open Database {} failed" , dataBaseName , e );
310- }
311- throw new TronError (e , TronError .ErrCode .ROCKSDB_INIT );
312- }
313-
314- alive = true ;
315- } catch (IOException ioe ) {
316- throw new RuntimeException (
317- String .format ("failed to init database: %s" , dataBaseName ), ioe );
255+ throw new TronError (e , TronError .ErrCode .ROCKSDB_INIT );
318256 }
319257
320- logger .debug ("Init DB {} done." , dataBaseName );
258+ alive = true ;
259+ } catch (IOException ioe ) {
260+ throw new RuntimeException (
261+ String .format ("failed to init database: %s" , dataBaseName ), ioe );
321262 }
263+
264+ logger .debug ("Init DB {} done." , dataBaseName );
322265 } finally {
323266 resetDbLock .writeLock ().unlock ();
324267 }
@@ -523,7 +466,8 @@ public boolean deleteDbBakPath(String dir) {
523466
524467 @ Override
525468 public RocksDbDataSourceImpl newInstance () {
526- return new RocksDbDataSourceImpl (parentPath , dataBaseName , RocksDbSettings .getSettings ());
469+ return new RocksDbDataSourceImpl (parentPath , dataBaseName ,
470+ this .options );
527471 }
528472
529473
0 commit comments