File tree Expand file tree Collapse file tree
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -84,6 +84,15 @@ public class OzoneClientConfig {
8484 tags = ConfigTag .CLIENT )
8585 private boolean datastreamPipelineMode = true ;
8686
87+ @ Config (key = "ozone.client.datastream.sync.size" ,
88+ defaultValue = "0B" ,
89+ type = ConfigType .SIZE ,
90+ description = "The minimum size of written data before forcing the datanodes " +
91+ "in the pipeline to flush the pending data to underlying storage." +
92+ " If set to zero or negative, the client will not force the datanodes to flush." ,
93+ tags = ConfigTag .CLIENT )
94+ private int dataStreamSyncSize = 0 ;
95+
8796 @ Config (key = "ozone.client.stream.buffer.increment" ,
8897 defaultValue = "0B" ,
8998 type = ConfigType .SIZE ,
@@ -570,6 +579,10 @@ public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
570579 this .datastreamPipelineMode = datastreamPipelineMode ;
571580 }
572581
582+ public int getDataStreamSyncSize () {
583+ return dataStreamSyncSize ;
584+ }
585+
573586 public void setHBaseEnhancementsAllowed (boolean isHBaseEnhancementsEnabled ) {
574587 this .hbaseEnhancementsAllowed = isHBaseEnhancementsEnabled ;
575588 }
Original file line number Diff line number Diff line change @@ -131,7 +131,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
131131 private final DataStreamOutput out ;
132132 private CompletableFuture <DataStreamReply > dataStreamCloseReply ;
133133 private List <CompletableFuture <DataStreamReply >> futures = new ArrayList <>();
134- private static final long SYNC_SIZE = 0 ; // TODO: disk sync is disabled for now
134+ private final long syncSize ;
135135 private long syncPosition = 0 ;
136136 private StreamBuffer currentBuffer ;
137137 private XceiverClientMetrics metrics ;
@@ -157,6 +157,7 @@ public BlockDataStreamOutput(
157157 this .xceiverClientFactory = xceiverClientManager ;
158158 this .config = config ;
159159 this .isDatastreamPipelineMode = config .isDatastreamPipelineMode ();
160+ this .syncSize = config .getDataStreamSyncSize ();
160161 this .blockID = new AtomicReference <>(blockID );
161162 KeyValue keyValue =
162163 KeyValue .newBuilder ().setKey ("TYPE" ).setValue ("KEY" ).build ();
@@ -647,9 +648,8 @@ public boolean isClosed() {
647648 }
648649
649650 private boolean needSync (long position ) {
650- if (SYNC_SIZE > 0 ) {
651- // TODO: or position >= fileLength
652- if (position - syncPosition >= SYNC_SIZE ) {
651+ if (syncSize > 0 ) {
652+ if (position - syncPosition >= syncSize ) {
653653 syncPosition = position ;
654654 return true ;
655655 }
You can’t perform that action at this time.
0 commit comments