Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection conne
.setParallel(parallel).setQueryTimeout(timeout).build();
}

private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, ObDirectLoadStatementExecutionId executionId)
throws ObDirectLoadException {
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
.setParallel(parallel).setQueryTimeout(timeout).setExecutionId(executionId).build();
}

private static class SimpleTest {

public static void run() {
Expand Down Expand Up @@ -240,9 +246,7 @@ public void run() {
executionId.decode(executionIdBytes);

connection = buildConnection(1);
statement = buildStatement(connection);

statement.resume(executionId);
statement = buildStatement(connection, executionId);

ObDirectLoadBucket bucket = new ObDirectLoadBucket();
ObObj[] rowObjs = new ObObj[2];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ public void executeWithConnection(final ObDirectLoadRpc rpc, ObTable table, long
}
}

public synchronized ObDirectLoadStatement createStatement() throws ObDirectLoadException {
public synchronized ObDirectLoadStatement createStatement(ObDirectLoadTraceId traceId)
throws ObDirectLoadException {
if (!isInited) {
logger.warn("connection not init");
throw new ObDirectLoadIllegalStateException("connection not init");
Expand All @@ -240,7 +241,7 @@ public synchronized ObDirectLoadStatement createStatement() throws ObDirectLoadE
logger.warn("connection is closed");
throw new ObDirectLoadIllegalStateException("connection is closed");
}
ObDirectLoadStatement stmt = new ObDirectLoadStatement(this);
ObDirectLoadStatement stmt = new ObDirectLoadStatement(this, traceId);
this.statementList.addLast(stmt);
return stmt;
}
Expand All @@ -257,7 +258,9 @@ ObDirectLoadStatement buildStatement(ObDirectLoadStatement.Builder builder)
throws ObDirectLoadException {
ObDirectLoadStatement stmt = null;
try {
stmt = createStatement();
final ObDirectLoadTraceId traceId = builder.getTraceId() != null ? builder.getTraceId()
: ObDirectLoadTraceId.generateTraceId();
stmt = createStatement(traceId);
stmt.init(builder);
} catch (Exception e) {
logger.warn("build statement failed, args:" + builder, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public class ObDirectLoadStatement {
private ObDirectLoadStatementExecutor executor = null;
private long startQueryTimeMillis = 0;

ObDirectLoadStatement(ObDirectLoadConnection connection) {
ObDirectLoadStatement(ObDirectLoadConnection connection, ObDirectLoadTraceId traceId) {
this.connection = connection;
this.traceId = ObDirectLoadTraceId.generateTraceId();
this.traceId = traceId;
this.logger = ObDirectLoadLogger.getLogger(this.traceId);
}

Expand Down Expand Up @@ -88,6 +88,9 @@ public synchronized void init(Builder builder) throws ObDirectLoadException {
obTablePool = new ObDirectLoadConnection.ObTablePool(connection, logger, queryTimeout);
obTablePool.init();
executor = new ObDirectLoadStatementExecutor(this);
if (builder.executionId != null) {
executor.resume(builder.executionId);
}
startQueryTimeMillis = System.currentTimeMillis();
isInited = true;
logger.info("statement init successful, args:" + builder);
Expand Down Expand Up @@ -294,6 +297,7 @@ public ObDirectLoadStatementExecutionId getExecutionId() throws ObDirectLoadExce
return executor.getExecutionId();
}

@Deprecated
public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirectLoadException {
if (executionId == null || !executionId.isValid()) {
logger.warn("Param 'executionId' must not be null or invalid, value:" + executionId);
Expand All @@ -306,20 +310,23 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect

public static final class Builder {

private final ObDirectLoadConnection connection;
private final ObDirectLoadConnection connection;

private String tableName = null;
private String[] columnNames = null;
private String[] partitionNames = null;
private ObLoadDupActionType dupAction = ObLoadDupActionType.INVALID_MODE;
private String tableName = null;
private String[] columnNames = null;
private String[] partitionNames = null;
private ObLoadDupActionType dupAction = ObLoadDupActionType.INVALID_MODE;

private int parallel = 0;
private long queryTimeout = 0;
private int parallel = 0;
private long queryTimeout = 0;

private long maxErrorRowCount = 0;
private String loadMethod = "full";
private long maxErrorRowCount = 0;
private String loadMethod = "full";

private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
private ObDirectLoadTraceId traceId = null;
private ObDirectLoadStatementExecutionId executionId = null;

private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;

Builder(ObDirectLoadConnection connection) {
this.connection = connection;
Expand Down Expand Up @@ -365,12 +372,22 @@ public Builder setLoadMethod(String loadMethod) {
return this;
}

public Builder setExecutionId(ObDirectLoadStatementExecutionId executionId) {
this.traceId = executionId.getTraceId();
this.executionId = executionId;
return this;
}

public ObDirectLoadTraceId getTraceId() {
return traceId;
}

public String toString() {
return String
.format(
"{tableName:%s, columnNames:%s, partitionNames:%s, dupAction:%s, parallel:%d, queryTimeout:%d, maxErrorRowCount:%d, loadMethod:%s}",
"{tableName:%s, columnNames:%s, partitionNames:%s, dupAction:%s, parallel:%d, queryTimeout:%d, maxErrorRowCount:%d, loadMethod:%s, executionId:%s}",
tableName, Arrays.toString(columnNames), Arrays.toString(partitionNames),
dupAction, parallel, queryTimeout, maxErrorRowCount, loadMethod);
dupAction, parallel, queryTimeout, maxErrorRowCount, loadMethod, executionId);
}

public ObDirectLoadStatement build() throws ObDirectLoadException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicLong;

import com.alipay.oceanbase.rpc.util.ObByteBuf;
import com.alipay.oceanbase.rpc.util.Serialization;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class ObDirectLoadTraceId {

private final long uniqueId;
Expand All @@ -30,10 +36,6 @@ public ObDirectLoadTraceId(long uniqueId, long sequence) {
this.sequence = sequence;
}

public String toString() {
return String.format("Y%X-%016X", uniqueId, sequence);
}

public long getUniqueId() {
return uniqueId;
}
Expand All @@ -42,6 +44,40 @@ public long getSequence() {
return sequence;
}

public String toString() {
return String.format("Y%X-%016X", uniqueId, sequence);
}

public byte[] encode() {
int needBytes = (int) getEncodedSize();
ObByteBuf buf = new ObByteBuf(needBytes);
encode(buf);
return buf.bytes;
}

public void encode(ObByteBuf buf) {
Serialization.encodeVi64(buf, uniqueId);
Serialization.encodeVi64(buf, sequence);
}

public static ObDirectLoadTraceId decode(ByteBuf buf) {
long uniqueId = Serialization.decodeVi64(buf);
long sequence = Serialization.decodeVi64(buf);
return new ObDirectLoadTraceId(uniqueId, sequence);
}

public static ObDirectLoadTraceId decode(byte[] bytes) {
ByteBuf buf = Unpooled.wrappedBuffer(bytes);
return decode(buf);
}

public int getEncodedSize() {
int len = 0;
len += Serialization.getNeedBytes(uniqueId);
len += Serialization.getNeedBytes(sequence);
return len;
}

public static final ObDirectLoadTraceId DEFAULT_TRACE_ID;
public static TraceIdGenerator traceIdGenerator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.alipay.oceanbase.rpc.direct_load.execution;

import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadTraceId;
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadIllegalArgumentException;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObAddr;
Expand All @@ -28,9 +29,11 @@

public class ObDirectLoadStatementExecutionId {

private long tableId = 0;
private long taskId = 0;
private ObAddr svrAddr = new ObAddr();
private long tableId = 0;
private long taskId = 0;
private ObAddr svrAddr = new ObAddr();

private ObDirectLoadTraceId traceId = null;

public ObDirectLoadStatementExecutionId() {
}
Expand All @@ -46,6 +49,20 @@ public ObDirectLoadStatementExecutionId(long tableId, long taskId, ObAddr svrAdd
this.svrAddr = svrAddr;
}

public ObDirectLoadStatementExecutionId(long tableId, long taskId, ObAddr svrAddr,
ObDirectLoadTraceId traceId)
throws ObDirectLoadException {
if (tableId < 0 || taskId <= 0 || svrAddr == null || traceId == null) {
throw new ObDirectLoadIllegalArgumentException(String.format(
"invalid args, tableId:%d, taskId:%d, svrAddr:%s, traceId:%s", tableId, taskId,
svrAddr, traceId));
}
this.tableId = tableId;
this.taskId = taskId;
this.svrAddr = svrAddr;
this.traceId = traceId;
}

public long getTableId() {
return tableId;
}
Expand All @@ -58,12 +75,17 @@ public ObAddr getSvrAddr() {
return svrAddr;
}

public ObDirectLoadTraceId getTraceId() {
return traceId;
}

public boolean isValid() {
return tableId >= 0 && taskId > 0 && svrAddr.isValid();
}

public String toString() {
return String.format("{tableId:%d, taskId:%d, svrAddr:%s}", tableId, taskId, svrAddr);
return String.format("{tableId:%d, taskId:%d, svrAddr:%s, traceId:%s}", tableId, taskId,
svrAddr, traceId);
}

public byte[] encode() {
Expand All @@ -77,12 +99,18 @@ public void encode(ObByteBuf buf) {
Serialization.encodeVi64(buf, tableId);
Serialization.encodeVi64(buf, taskId);
svrAddr.encode(buf);
if (traceId != null) {
traceId.encode(buf);
}
}

public ObDirectLoadStatementExecutionId decode(ByteBuf buf) {
tableId = Serialization.decodeVi64(buf);
taskId = Serialization.decodeVi64(buf);
svrAddr.decode(buf);
if (buf.readableBytes() > 0) {
traceId = ObDirectLoadTraceId.decode(buf);
}
return this;
}

Expand All @@ -96,6 +124,9 @@ public int getEncodedSize() {
len += Serialization.getNeedBytes(tableId);
len += Serialization.getNeedBytes(taskId);
len += svrAddr.getEncodedSize();
if (traceId != null) {
len += traceId.getEncodedSize();
}
return len;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class ObDirectLoadStatementExecutor {
private ObAddr svrAddr = null;
private ObDirectLoadException cause = null; // 失败原因

private AtomicInteger writingCount = new AtomicInteger(0);

public ObDirectLoadStatementExecutor(ObDirectLoadStatement statement) {
this.statement = statement;
this.traceId = statement.getTraceId();
Expand Down Expand Up @@ -164,7 +166,7 @@ public synchronized void detach() throws ObDirectLoadException {
public ObDirectLoadStatementExecutionId getExecutionId() throws ObDirectLoadException {
checkState(LOADING, "getExecutionId");
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId(
tableId, taskId, svrAddr);
tableId, taskId, svrAddr, traceId);
return executionId;
}

Expand Down Expand Up @@ -247,6 +249,25 @@ public void close() {
logger.warn("statement abort failed", e);
}
}
// 如果还有写没结束, 等待写结束
if (writingCount.get() > 0) {
logger.info("statement close wait write");
try {
final long startTimeMillis = System.currentTimeMillis();
long loopCnt = 0;
while (writingCount.get() > 0) {
Thread.sleep(10);
++loopCnt;
if (loopCnt % 100 == 0) {
final long curTimeMillis = System.currentTimeMillis();
logger.warn("statement has been wait write for "
+ (curTimeMillis - startTimeMillis) + " ms");
}
}
} catch (Exception e) {
logger.warn("statement wait write failed", e);
}
}
}

private synchronized void abortIfNeed() {
Expand Down Expand Up @@ -343,15 +364,23 @@ void stopHeartBeat() {

public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException {
checkState(LOADING, LOADING_ONLY, "write");
ObDirectLoadStatementPromiseTask task = new ObDirectLoadStatementWriteTask(statement, this,
bucket);
task.run();
if (!task.isDone()) {
logger.warn("statement write task unexpected not done");
throw new ObDirectLoadUnexpectedException("statement write task unexpected not done");
}
if (!task.isSuccess()) {
throw task.cause();
writingCount.incrementAndGet();
try {
ObDirectLoadStatementPromiseTask task = new ObDirectLoadStatementWriteTask(statement,
this, bucket);
task.run();
if (!task.isDone()) {
logger.warn("statement write task unexpected not done");
throw new ObDirectLoadUnexpectedException(
"statement write task unexpected not done");
}
if (!task.isSuccess()) {
throw task.cause();
}
} catch (ObDirectLoadException e) {
throw e;
} finally {
writingCount.decrementAndGet();
}
}

Expand Down
Loading