Skip to content

Commit b532062

Browse files
committed
Fix
1 parent 7563ac8 commit b532062

4 files changed

Lines changed: 227 additions & 14 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.apache.iotdb.commons.audit.UserEntity;
2525
import org.apache.iotdb.commons.conf.CommonDescriptor;
2626
import org.apache.iotdb.commons.exception.IllegalPathException;
27+
import org.apache.iotdb.commons.i18n.PipeMessages;
2728
import org.apache.iotdb.commons.path.PartialPath;
29+
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils;
2830
import org.apache.iotdb.commons.queryengine.common.SessionInfo;
2931
import org.apache.iotdb.commons.utils.FileUtils;
3032
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -54,6 +56,7 @@
5456
import java.io.IOException;
5557
import java.io.RandomAccessFile;
5658
import java.nio.ByteBuffer;
59+
import java.nio.file.Paths;
5760
import java.time.ZoneId;
5861
import java.util.Map;
5962
import java.util.Objects;
@@ -267,9 +270,11 @@ private SyncIdentityInfo getCurrentSyncIdentityInfo() {
267270
* @param tsFilePipeData pipeData
268271
* @param fileDir path of file data dir
269272
*/
270-
private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final String fileDir) {
273+
private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final String fileDir)
274+
throws IOException {
271275
final String tsFileName = tsFilePipeData.getTsFileName();
272-
final File dir = new File(fileDir);
276+
final File tsFile = resolveFileInFileDataDir(fileDir, tsFileName);
277+
final File dir = tsFile.getParentFile();
273278
final File[] targetFiles =
274279
dir.listFiles((dir1, name) -> name.startsWith(tsFileName) && name.endsWith(PATCH_SUFFIX));
275280
if (targetFiles != null) {
@@ -311,10 +316,18 @@ public TSStatus transportFile(final TSyncTransportMetaInfo metaInfo, final ByteB
311316
final String fileDir = getFileDataDir(identityInfo);
312317
final String fileName = metaInfo.fileName;
313318
final long startIndex = metaInfo.startIndex;
314-
final File file = new File(fileDir, fileName + PATCH_SUFFIX);
319+
final File file;
320+
final File fileWithoutPatch;
321+
try {
322+
fileWithoutPatch = resolveFileInFileDataDir(fileDir, fileName);
323+
file = resolveFileInFileDataDir(fileDir, fileName + PATCH_SUFFIX);
324+
} catch (final IOException e) {
325+
LOGGER.warn(e.getMessage());
326+
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
327+
}
315328

316329
// step2. check startIndex
317-
final IndexCheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex);
330+
final IndexCheckResult result = checkStartIndexValid(fileWithoutPatch, startIndex);
318331
if (!result.isResult()) {
319332
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, result.getIndex());
320333
}
@@ -326,7 +339,7 @@ public TSStatus transportFile(final TSyncTransportMetaInfo metaInfo, final ByteB
326339
final byte[] byteArray = new byte[length];
327340
buff.get(byteArray);
328341
randomAccessFile.write(byteArray);
329-
recordStartIndex(new File(fileDir, fileName), startIndex + length);
342+
recordStartIndex(fileWithoutPatch, startIndex + length);
330343
LOGGER.debug(
331344
DataNodePipeMessages.SYNC_START_AT_TO_IS_DONE, fileName, startIndex, startIndex + length);
332345
} catch (final IOException e) {
@@ -337,6 +350,23 @@ public TSStatus transportFile(final TSyncTransportMetaInfo metaInfo, final ByteB
337350
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
338351
}
339352

353+
private static File resolveFileInFileDataDir(final String fileDir, final String fileName)
354+
throws IOException {
355+
if (StringUtils.isEmpty(fileName)) {
356+
throw new IOException(String.format(PipeMessages.ILLEGAL_FILENAME_PATH_TRAVERSAL, fileName));
357+
}
358+
359+
final String illegalError = FileUtils.getIllegalError4Directory(fileName);
360+
if (Objects.nonNull(illegalError)) {
361+
throw new IOException(
362+
String.format(PipeMessages.ILLEGAL_FILENAME_PATH_TRAVERSAL, fileName)
363+
+ ", "
364+
+ illegalError);
365+
}
366+
367+
return PipeReceiverFilePathUtils.resolveFilePath(Paths.get(fileDir), fileName).toFile();
368+
}
369+
340370
private IndexCheckResult checkStartIndexValid(final File file, final long startIndex) {
341371
// get local index from memory map
342372
long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
2525
import org.apache.iotdb.commons.conf.CommonConfig;
2626
import org.apache.iotdb.commons.conf.CommonDescriptor;
27+
import org.apache.iotdb.commons.conf.IoTDBConstant;
2728
import org.apache.iotdb.commons.consensus.DataRegionId;
2829
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
2930
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -52,6 +53,9 @@
5253
import org.apache.iotdb.rpc.IoTDBConnectionException;
5354
import org.apache.iotdb.rpc.StatementExecutionException;
5455
import org.apache.iotdb.rpc.TSStatusCode;
56+
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
57+
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
58+
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
5559
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
5660
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
5761
import org.apache.iotdb.session.pool.SessionPool;
@@ -66,6 +70,7 @@
6670
import java.io.IOException;
6771
import java.io.RandomAccessFile;
6872
import java.nio.ByteBuffer;
73+
import java.time.ZoneId;
6974
import java.util.Arrays;
7075
import java.util.HashSet;
7176
import java.util.List;
@@ -229,6 +234,7 @@ public void handshake() throws Exception {
229234
useSSL,
230235
trustStore,
231236
trustStorePwd);
237+
openClientSession();
232238
final TSyncIdentityInfo identityInfo =
233239
new TSyncIdentityInfo(
234240
pipeName, System.currentTimeMillis(), syncConnectorVersion, databaseName);
@@ -259,6 +265,26 @@ public void handshake() throws Exception {
259265
.build();
260266
}
261267

268+
private void openClientSession() throws TException {
269+
final TSOpenSessionReq openSessionReq = new TSOpenSessionReq();
270+
openSessionReq.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
271+
openSessionReq.setUsername(user);
272+
openSessionReq.setPassword(password);
273+
openSessionReq.setZoneId(ZoneId.systemDefault().toString());
274+
openSessionReq.putToConfiguration("version", IoTDBConstant.ClientVersion.V_1_0.toString());
275+
openSessionReq.putToConfiguration("sql_dialect", "tree");
276+
277+
final TSOpenSessionResp openSessionResp = client.openSession(openSessionReq);
278+
if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
279+
final String errorMsg =
280+
String.format(
281+
"Failed to login to receiver %s:%s for legacy pipe transfer because %s",
282+
ipAddress, port, openSessionResp.getStatus().getMessage());
283+
LOGGER.warn(errorMsg);
284+
throw new PipeRuntimeCriticalException(errorMsg);
285+
}
286+
}
287+
262288
@Override
263289
public void heartbeat() throws Exception {
264290
// do nothing

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.commons.audit.AuditEventType;
2929
import org.apache.iotdb.commons.audit.AuditLogFields;
3030
import org.apache.iotdb.commons.audit.AuditLogOperation;
31+
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
3132
import org.apache.iotdb.commons.client.exception.ClientManagerException;
3233
import org.apache.iotdb.commons.conf.CommonConfig;
3334
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -3398,24 +3399,58 @@ public TSStatus createTimeseriesUsingSchemaTemplate(TCreateTimeseriesUsingSchema
33983399

33993400
@Override
34003401
public TSStatus handshake(final TSyncIdentityInfo info) throws TException {
3401-
return PipeDataNodeAgent.receiver()
3402-
.legacy()
3403-
.handshake(
3404-
info,
3405-
SESSION_MANAGER.getCurrSession().getClientAddress(),
3406-
partitionFetcher,
3407-
schemaFetcher);
3402+
try {
3403+
final TSStatus status = checkLegacyPipeReceiverPermission();
3404+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
3405+
return status;
3406+
}
3407+
return PipeDataNodeAgent.receiver()
3408+
.legacy()
3409+
.handshake(
3410+
info,
3411+
SESSION_MANAGER.getCurrSession().getClientAddress(),
3412+
partitionFetcher,
3413+
schemaFetcher);
3414+
} finally {
3415+
SESSION_MANAGER.updateIdleTime();
3416+
}
34083417
}
34093418

34103419
@Override
34113420
public TSStatus sendPipeData(final ByteBuffer buff) throws TException {
3412-
return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff);
3421+
try {
3422+
final TSStatus status = checkLegacyPipeReceiverPermission();
3423+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
3424+
return status;
3425+
}
3426+
return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff);
3427+
} finally {
3428+
SESSION_MANAGER.updateIdleTime();
3429+
}
34133430
}
34143431

34153432
@Override
34163433
public TSStatus sendFile(final TSyncTransportMetaInfo metaInfo, final ByteBuffer buff)
34173434
throws TException {
3418-
return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff);
3435+
try {
3436+
final TSStatus status = checkLegacyPipeReceiverPermission();
3437+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
3438+
return status;
3439+
}
3440+
return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff);
3441+
} finally {
3442+
SESSION_MANAGER.updateIdleTime();
3443+
}
3444+
}
3445+
3446+
private TSStatus checkLegacyPipeReceiverPermission() {
3447+
final IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
3448+
if (!SESSION_MANAGER.checkLogin(clientSession)) {
3449+
return getNotLoggedInStatus();
3450+
}
3451+
return AuthorityChecker.getTSStatus(
3452+
AuthorityChecker.checkSystemPermission(clientSession.getUsername(), PrivilegeType.USE_PIPE),
3453+
PrivilegeType.USE_PIPE);
34193454
}
34203455

34213456
@Override
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.receiver.protocol.legacy;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.conf.CommonDescriptor;
24+
import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
25+
import org.apache.iotdb.rpc.TSStatusCode;
26+
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
27+
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
28+
29+
import org.junit.After;
30+
import org.junit.Assert;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
34+
import java.io.File;
35+
import java.nio.ByteBuffer;
36+
import java.nio.charset.StandardCharsets;
37+
import java.nio.file.Files;
38+
import java.nio.file.Path;
39+
40+
public class IoTDBLegacyPipeReceiverAgentTest {
41+
42+
private static final String PIPE_NAME = "poc";
43+
private static final long CREATE_TIME = 1700000000000L;
44+
private static final String REMOTE_ADDRESS = "127.0.0.1";
45+
46+
private String originalSyncDir;
47+
private Path syncDir;
48+
private IoTDBLegacyPipeReceiverAgent agent;
49+
50+
@Before
51+
public void setUp() throws Exception {
52+
originalSyncDir = CommonDescriptor.getInstance().getConfig().getSyncDir();
53+
syncDir = Files.createTempDirectory("legacy-pipe-receiver");
54+
CommonDescriptor.getInstance().getConfig().setSyncDir(syncDir.toString());
55+
56+
agent = new IoTDBLegacyPipeReceiverAgent();
57+
final TSStatus status =
58+
agent.handshake(
59+
new TSyncIdentityInfo(PIPE_NAME, CREATE_TIME, "UNKNOWN", ""),
60+
REMOTE_ADDRESS,
61+
null,
62+
null);
63+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
64+
}
65+
66+
@After
67+
public void tearDown() throws Exception {
68+
if (agent != null) {
69+
agent.handleClientExit();
70+
}
71+
CommonDescriptor.getInstance().getConfig().setSyncDir(originalSyncDir);
72+
if (syncDir != null) {
73+
org.apache.tsfile.external.commons.io.FileUtils.deleteDirectory(syncDir.toFile());
74+
}
75+
}
76+
77+
@Test
78+
public void testTransportFileRejectsPathTraversal() throws Exception {
79+
final String traversal =
80+
".." + File.separator + ".." + File.separator + ".." + File.separator + "pwned";
81+
82+
final TSStatus status =
83+
agent.transportFile(
84+
new TSyncTransportMetaInfo(traversal, 0),
85+
ByteBuffer.wrap("pwned".getBytes(StandardCharsets.UTF_8)));
86+
87+
Assert.assertEquals(TSStatusCode.SYNC_FILE_ERROR.getStatusCode(), status.getCode());
88+
Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
89+
Assert.assertFalse(Files.exists(syncDir.resolve("pwned.patch")));
90+
}
91+
92+
@Test
93+
public void testTransportFileWritesPlainFileUnderFileDataDir() throws Exception {
94+
final String fileName = "1-2-3-4.tsfile";
95+
final byte[] payload = "iotdb".getBytes(StandardCharsets.UTF_8);
96+
97+
final TSStatus status =
98+
agent.transportFile(new TSyncTransportMetaInfo(fileName, 0), ByteBuffer.wrap(payload));
99+
100+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
101+
final Path patchFile = getFileDataDir().resolve(fileName + ".patch");
102+
Assert.assertArrayEquals(payload, Files.readAllBytes(patchFile));
103+
}
104+
105+
@Test
106+
public void testTransportPipeDataRejectsPathTraversalTsFileName() throws Exception {
107+
final String traversal = ".." + File.separator + "evil.tsfile";
108+
109+
final TSStatus status =
110+
agent.transportPipeData(ByteBuffer.wrap(new TsFilePipeData("", traversal, -1).serialize()));
111+
112+
Assert.assertEquals(TSStatusCode.PIPESERVER_ERROR.getStatusCode(), status.getCode());
113+
Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
114+
}
115+
116+
private Path getFileDataDir() {
117+
return syncDir
118+
.resolve("receiver")
119+
.resolve(String.format("%s-%d-%s", PIPE_NAME, CREATE_TIME, REMOTE_ADDRESS))
120+
.resolve("file-data");
121+
}
122+
}

0 commit comments

Comments
 (0)