Skip to content

Commit 7a35b34

Browse files
committed
scipy matrix transfer to java runtime
1 parent d3ffc70 commit 7a35b34

9 files changed

Lines changed: 867 additions & 387 deletions

File tree

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.nio.ByteOrder;
2424
import java.nio.charset.StandardCharsets;
2525

26+
import org.apache.log4j.Logger;
2627
import org.apache.sysds.common.Types;
2728
import org.apache.sysds.runtime.DMLRuntimeException;
2829
import org.apache.sysds.runtime.frame.data.columns.Array;
@@ -35,6 +36,7 @@
3536
* Utils for converting python data to java.
3637
*/
3738
public class Py4jConverterUtils {
39+
private static final Logger LOG = Logger.getLogger(Py4jConverterUtils.class);
3840
public static MatrixBlock convertPy4JArrayToMB(byte[] data, int rlen, int clen) {
3941
return convertPy4JArrayToMB(data, rlen, clen, false, Types.ValueType.FP64);
4042
}
@@ -63,6 +65,45 @@ public static MatrixBlock convertSciPyCOOToMB(byte[] data, byte[] row, byte[] co
6365
return mb;
6466
}
6567

68+
public static MatrixBlock convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr, int rlen, int clen, int nnz) {
69+
LOG.debug("Converting compressed sparse row matrix to MatrixBlock");
70+
MatrixBlock mb = new MatrixBlock(rlen, clen, true);
71+
mb.allocateSparseRowsBlock(false);
72+
ByteBuffer dataBuf = ByteBuffer.wrap(data);
73+
dataBuf.order(ByteOrder.nativeOrder());
74+
ByteBuffer indicesBuf = ByteBuffer.wrap(indices);
75+
indicesBuf.order(ByteOrder.nativeOrder());
76+
ByteBuffer indptrBuf = ByteBuffer.wrap(indptr);
77+
indptrBuf.order(ByteOrder.nativeOrder());
78+
79+
// Read indptr array to get row boundaries
80+
int[] rowPtrs = new int[rlen + 1];
81+
for(int i = 0; i <= rlen; i++) {
82+
rowPtrs[i] = indptrBuf.getInt();
83+
}
84+
85+
// Iterate through each row
86+
for(int row = 0; row < rlen; row++) {
87+
int startIdx = rowPtrs[row];
88+
int endIdx = rowPtrs[row + 1];
89+
90+
// Set buffer positions to the start of this row
91+
dataBuf.position(startIdx * Double.BYTES);
92+
indicesBuf.position(startIdx * Integer.BYTES);
93+
94+
// Process all non-zeros in this row sequentially
95+
for(int idx = startIdx; idx < endIdx; idx++) {
96+
double val = dataBuf.getDouble();
97+
int colIndex = indicesBuf.getInt();
98+
mb.set(row, colIndex, val);
99+
}
100+
}
101+
102+
mb.recomputeNonZeros();
103+
mb.examSparsity();
104+
return mb;
105+
}
106+
66107
public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse) {
67108
MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
68109
ret.allocateBlock();
@@ -208,6 +249,7 @@ private static void readBufferIntoArray(ByteBuffer buffer, Array<?> array, Types
208249
public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
209250
byte[] ret = null;
210251
if(mb.isInSparseFormat()) {
252+
LOG.debug("Converting sparse matrix to dense");
211253
mb.sparseToDense();
212254
}
213255

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
.. -------------------------------------------------------------
2+
..
3+
.. Licensed to the Apache Software Foundation (ASF) under one
4+
.. or more contributor license agreements. See the NOTICE file
5+
.. distributed with this work for additional information
6+
.. regarding copyright ownership. The ASF licenses this file
7+
.. to you under the Apache License, Version 2.0 (the
8+
.. "License"); you may not use this file except in compliance
9+
.. with the License. You may obtain a copy of the License at
10+
..
11+
.. http://www.apache.org/licenses/LICENSE-2.0
12+
..
13+
.. Unless required by applicable law or agreed to in writing,
14+
.. software distributed under the License is distributed on an
15+
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
.. KIND, either express or implied. See the License for the
17+
.. specific language governing permissions and limitations
18+
.. under the License.
19+
..
20+
.. ------------------------------------------------------------
21+
22+
scaleRobustApply
23+
================
24+
25+
.. autofunction:: systemds.operator.algorithm.scaleRobustApply

src/main/python/systemds/context/systemds_context.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,19 @@
2929
import sys
3030
import struct
3131
import traceback
32+
import warnings
3233
from contextlib import contextmanager
3334
from glob import glob
3435
from queue import Queue
3536
from subprocess import PIPE, Popen
3637
from threading import Thread
37-
from time import sleep, time
38+
from time import sleep
3839
from typing import Dict, Iterable, Sequence, Tuple, Union
3940
from concurrent.futures import ThreadPoolExecutor
4041

4142
import numpy as np
4243
import pandas as pd
44+
import scipy.sparse as sp
4345
from py4j.java_gateway import GatewayParameters, JavaGateway, Py4JNetworkError
4446
from systemds.operator import (
4547
Frame,
@@ -77,6 +79,7 @@ class SystemDSContext(object):
7779
_FIFO_JAVA2PY_PIPES = []
7880
_data_transfer_mode = 0
7981
_multi_pipe_enabled = False
82+
_sparse_data_transfer = True
8083
_logging_initialized = False
8184
_executor_pool = ThreadPoolExecutor(max_workers=os.cpu_count() * 2 or 4)
8285

@@ -89,6 +92,7 @@ def __init__(
8992
py4j_logging_level: int = 50,
9093
data_transfer_mode: int = 1,
9194
multi_pipe_enabled: bool = False,
95+
sparse_data_transfer: bool = True,
9296
):
9397
"""Starts a new instance of SystemDSContext, in which the connection to a JVM systemds instance is handled
9498
Any new instance of this SystemDS Context, would start a separate new JVM.
@@ -103,14 +107,26 @@ def __init__(
103107
The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING, 40 ERROR, 50 CRITICAL.
104108
:param py4j_logging_level: The logging level for Py4j to use, since all communication to the JVM is done through this,
105109
it can be verbose if not set high.
106-
:param data_transfer_mode: default 0,
110+
:param data_transfer_mode: default 0, 0 for py4j, 1 for using pipes (on unix systems)
111+
:param multi_pipe_enabled: default False, if True, use multiple pipes for data transfer
112+
only used if data_transfer_mode is 1.
113+
.. experimental:: This parameter is experimental and may be removed in a future version.
114+
:param sparse_data_transfer: default True, if True, use optimized sparse matrix transfer,
115+
if False, convert sparse matrices to dense arrays before transfer
107116
"""
108117

118+
if multi_pipe_enabled:
119+
warnings.warn(
120+
"The 'multi_pipe_enabled' parameter is experimental and may be removed in a future version.",
121+
DeprecationWarning,
122+
stacklevel=2,
123+
)
109124
self.__setup_logging(logging_level, py4j_logging_level)
110125
self.__start(port, capture_stdout)
111126
self.capture_stats(capture_statistics)
112127
self._log.debug("Started JVM and SystemDS python context manager")
113128
self.__setup_data_transfer(data_transfer_mode, multi_pipe_enabled)
129+
self._sparse_data_transfer = sparse_data_transfer
114130

115131
def __setup_data_transfer(self, data_transfer_mode=0, multi_pipe_enabled=False):
116132
self._data_transfer_mode = data_transfer_mode
@@ -771,14 +787,14 @@ def scalar(self, v: Dict[str, VALID_INPUT_TYPES]) -> Scalar:
771787

772788
def from_numpy(
773789
self,
774-
mat: np.array,
790+
mat: Union[np.ndarray, sp.spmatrix],
775791
*args: Sequence[VALID_INPUT_TYPES],
776792
**kwargs: Dict[str, VALID_INPUT_TYPES],
777793
) -> Matrix:
778-
"""Generate DAGNode representing matrix with data given by a numpy array, which will be sent to SystemDS
779-
on need.
794+
"""Generate DAGNode representing matrix with data given by a numpy array or scipy sparse matrix,
795+
which will be sent to SystemDS on need.
780796
781-
:param mat: the numpy array
797+
:param mat: the numpy array or scipy sparse matrix
782798
:param args: unnamed parameters
783799
:param kwargs: named parameters
784800
:return: A Matrix

src/main/python/systemds/operator/algorithm/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@
178178
from .builtin.scale import scale
179179
from .builtin.scaleApply import scaleApply
180180
from .builtin.scaleMinMax import scaleMinMax
181+
from .builtin.scaleRobustApply import scaleRobustApply
181182
from .builtin.selectByVarThresh import selectByVarThresh
182183
from .builtin.ses import ses
183184
from .builtin.setdiff import setdiff
@@ -377,6 +378,7 @@
377378
'scale',
378379
'scaleApply',
379380
'scaleMinMax',
381+
'scaleRobustApply',
380382
'selectByVarThresh',
381383
'ses',
382384
'setdiff',
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# -------------------------------------------------------------
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
# -------------------------------------------------------------
21+
22+
# Autogenerated By : src/main/python/generator/generator.py
23+
# Autogenerated From : scripts/builtin/scaleRobustApply.dml
24+
25+
from typing import Dict, Iterable
26+
27+
from systemds.operator import OperationNode, Matrix, Frame, List, MultiReturn, Scalar
28+
from systemds.utils.consts import VALID_INPUT_TYPES
29+
30+
31+
def scaleRobustApply(X: Matrix,
32+
med: Matrix,
33+
q1: Matrix,
34+
q3: Matrix):
35+
"""
36+
Apply robust scaling using precomputed medians and IQRs
37+
38+
39+
40+
:param X: Input feature matrix of shape n-by-m
41+
:param med: Column medians (Q2) of shape 1-by-m
42+
:param q1: Column first quantiles (Q1) of shape 1-by-m
43+
:param q3: Column first quantiles (Q3) of shape 1-by-m
44+
:return: Scaled output matrix of shape n-by-m
45+
"""
46+
47+
params_dict = {'X': X, 'med': med, 'q1': q1, 'q3': q3}
48+
return Matrix(X.sds_context,
49+
'scaleRobustApply',
50+
named_input_nodes=params_dict)

0 commit comments

Comments
 (0)