Koalas: pandas API on Apache Spark
We fixed a critical bug introduced in Koalas 1.0.0 (#1609).
If we call DataFrame.rename
with columns
parameter after some operations on the DataFrame, the operations will be lost:
>>> kdf = ks.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=["A", "B", "C", "D"])
>>> kdf1 = kdf + 1
>>> kdf1
A B C D
0 2 3 4 5
1 6 7 8 9
>>> kdf1.rename(columns={"A": "aa", "B": "bb"})
aa bb C D
0 1 2 3 4
1 5 6 7 8
This should be:
>>> pdf1.rename(columns={"A": "aa", "B": "bb"})
aa bb C D
0 2 3 4 5
1 6 7 8 9
We implemented many APIs and features equivalent with pandas such as plotting, grouping, windowing, I/O, and transformation, and now Koalas reaches the pandas API coverage close to 80% in Koalas 1.0.0.
Apache Spark 3.0 is now supported in Koalas 1.0 (#1586, #1558). Koalas does not require any change to use Spark 3.0. Apache Spark has more than 3400 fixes landed in Spark 3.0 and Koalas shares the most of fixes in many other components.
It also brings the performance improvement in Koalas APIs that execute Python native functions internally via pandas UDFs, for example, DataFrame.apply
and DataFrame.apply_batch
(#1508).
With Apache Spark 3.0, Koalas supports the latest Python 3.8 which has many significant improvements (#1587), see also Python 3.8.0 release notes.
spark
accessor was introduced from Koalas 1.0.0 in order for the Koalas users to leverage the existing PySpark APIs more easily (#1530). For example, you can apply the PySpark functions as below:
import databricks.koalas as ks
import pyspark.sql.functions as F
kss = ks.Series([1, 2, 3, 4])
kss.spark.apply(lambda s: F.collect_list(s))
In the early versions, it was required to use Koalas instances as the return type hints for the functions that return a pandas instances, which looks slightly awkward.
def pandas_div(pdf) -> koalas.DataFrame[float, float]:
# pdf is a pandas DataFrame,
return pdf[['B', 'C']] / pdf[['B', 'C']]
df = ks.DataFrame({'A': ['a', 'a', 'b'], 'B': [1, 2, 3], 'C': [4, 6, 5]})
df.groupby('A').apply(pandas_div)
In Koalas 1.0.0 with Python 3.7+, you can also use pandas instances in the return type as below:
def pandas_div(pdf) -> pandas.DataFrame[float, float]:
return pdf[['B', 'C']] / pdf[['B', 'C']]
In addition, the new type hinting is experimentally introduced in order to allow users to specify column names in the type hints as below (#1577):
def pandas_div(pdf) -> pandas.DataFrame['B': float, 'C': float]:
return pdf[['B', 'C']] / pdf[['B', 'C']]
See also the guide in Koalas documentation (#1584) for more details.
Previously in-place updates happen only within each DataFrame or Series, but now the behavior follows pandas in-place updates and the update of one side also updates the other side (#1592).
For example, the following updates kdf
as well.
kdf = ks.DataFrame({"x": [np.nan, 2, 3, 4, np.nan, 6]})
kser = kdf.x
kser.fillna(0, inplace=True)
kdf = ks.DataFrame({"x": [np.nan, 2, 3, 4, np.nan, 6]})
kser = kdf.x
kser.loc[2] = 30
kdf = ks.DataFrame({"x": [np.nan, 2, 3, 4, np.nan, 6]})
kser = kdf.x
kdf.loc[2, 'x'] = 30
If the DataFrame and Series are connected, the in-place updates update each other.
compute.ops_on_diff_frames
In Koalas 1.0.0, the restriction of compute.ops_on_diff_frames
became much more loosened (#1522, #1554). For example, the operations such as below can be performed without enabling compute.ops_on_diff_frames
, which can be expensive due to the shuffle under the hood.
df + df + df
df['foo'] = df['bar']['baz']
df[['x', 'y']] = df[['x', 'y']].fillna(0)
DataFrame:
__bool__
(#1526)explode
(#1507)spark.apply
(#1536)spark.schema
(#1530)spark.print_schema
(#1530)spark.frame
(#1530)spark.cache
(#1530)spark.persist
(#1530)spark.hint
(#1530)spark.to_table
(#1530)spark.to_spark_io
(#1530)spark.explain
(#1530)spark.apply
(#1530)mad
(#1538)__abs__
(#1561)Series:
item
(#1502, #1518)divmod
(#1397)rdivmod
(#1397)unstack
(#1501)mad
(#1503)__bool__
(#1526)to_markdown
(#1510)spark.apply
(#1536)spark.data_type
(#1530)spark.nullable
(#1530)spark.column
(#1530)spark.transform
(#1530)filter
(#1511)__abs__
(#1561)bfill
(#1580)ffill
(#1580)Index:
__bool__
(#1526)spark.data_type
(#1530)spark.column
(#1530)spark.transform
(#1530)get_level_values
(#1517)delete
(#1165)__abs__
(#1561)holds_integer
(#1547)MultiIndex:
__bool__
(#1526)spark.data_type
(#1530)spark.column
(#1530)spark.transform
(#1530)get_level_values
(#1517)delete
(#1165__abs__
(#1561)holds_integer
(#1547)Along with the following improvements:
apply
and transform
ImprovementsWe added supports to have positional/keyword arguments for apply
, apply_batch
, transform
, and transform_batch
in DataFrame
, Series
, and GroupBy
. (#1484, #1485, #1486)
>>> ks.range(10).apply(lambda a, b, c: a + b + c, args=(1,), c=3)
id
0 4
1 5
2 6
3 7
4 8
5 9
6 10
7 11
8 12
9 13
>>> ks.range(10).transform_batch(lambda pdf, a, b, c: pdf.id + a + b + c, 1, 2, c=3)
0 6
1 7
2 8
3 9
4 10
5 11
6 12
7 13
8 14
9 15
Name: id, dtype: int64
>>> kdf = ks.DataFrame(
... {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
... columns=["a", "b", "c"])
>>> kdf.groupby(["a", "b"]).apply(lambda x, y, z: x + x.min() + y + z, 1, z=2)
a b c
0 5 5 5
1 7 5 11
2 9 7 21
3 11 9 35
4 13 13 53
5 15 19 75
We add spark_schema
and print_schema
to know the underlying Spark Schema. (#1446)
>>> kdf = ks.DataFrame({'a': list('abc'),
... 'b': list(range(1, 4)),
... 'c': np.arange(3, 6).astype('i1'),
... 'd': np.arange(4.0, 7.0, dtype='float64'),
... 'e': [True, False, True],
... 'f': pd.date_range('20130101', periods=3)},
... columns=['a', 'b', 'c', 'd', 'e', 'f'])
>>> # Print the schema out in Spark’s DDL formatted string
>>> kdf.spark_schema().simpleString()
'struct<a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
>>> kdf.spark_schema(index_col='index').simpleString()
'struct<index:bigint,a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
>>> # Print out the schema as same as DataFrame.printSchema()
>>> kdf.print_schema()
root
|-- a: string (nullable = false)
|-- b: long (nullable = false)
|-- c: byte (nullable = false)
|-- d: double (nullable = false)
|-- e: boolean (nullable = false)
|-- f: timestamp (nullable = false)
>>> kdf.print_schema(index_col='index')
root
|-- index: long (nullable = false)
|-- a: string (nullable = false)
|-- b: long (nullable = false)
|-- c: byte (nullable = false)
|-- d: double (nullable = false)
|-- e: boolean (nullable = false)
|-- f: timestamp (nullable = false)
We fixed many bugs of GroupBy
as listed below.
We added the following new feature:
SeriesGroupBy:
filter
(#1483)Koalas documentation was redesigned with a better theme, pydata-sphinx-theme. Please check the new Koalas documentation site out.
transform_batch
and apply_batch
We added the APIs that enable you to directly transform and apply a function against Koalas Series or DataFrame. map_in_pandas
is deprecated and now renamed to apply_batch
.
import databricks.koalas as ks
kdf = ks.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
return pdf + 1 # should always return the same length as input.
kdf.transform_batch(pandas_plus)
import databricks.koalas as ks
kdf = ks.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
return pdf[pdf.a > 1] # allow arbitrary length
kdf.apply_batch(pandas_plus)
Please also check Transform and apply a function in Koalas documentation.
We added the following new feature:
DataFrame:
truncate
(#1408)hint
(#1415)SeriesGroupBy:
unique
(#1426)Index:
spark_column
(#1438)Series:
spark_column
(#1438)MultiIndex:
spark_column
(#1438)We added PyArrow>=0.15 support back (#1110).
Note that, when working with pyarrow>=0.15
and pyspark<3.0
, Koalas will set an environment variable ARROW_PRE_0_15_IPC_FORMAT=1
if it does not exist, as per the instruction in SPARK-29367, but it will NOT work if there is a Spark context already launched. In that case, you have to manage the environment variable by yourselves.
We added broadcast
function in namespace.py (#1360).
We can use it with merge
, join
, and update
which invoke join operation in Spark when you know one of the DataFrame is small enough to fit in memory, and we can expect much more performant than shuffle-based joins.
For example,
>>> merged = df1.merge(ks.broadcast(df2), left_on='lkey', right_on='rkey')
>>> merged.explain()
== Physical Plan ==
...
...BroadcastHashJoin...
...
We added persist
function to specify the storage level when caching (#1381), and also, we added storage_level
property to check the current storage level (#1385).
>>> with df.cache() as cached_df:
... print(cached_df.storage_level)
...
Disk Memory Deserialized 1x Replicated
>>> with df.persist(pyspark.StorageLevel.MEMORY_ONLY) as cached_df:
... print(cached_df.storage_level)
...
Memory Serialized 1x Replicated
We added the following new feature:
DataFrame:
to_markdown
(#1377)squeeze
(#1389)Series:
squeeze
(#1389)asof
(#1366)iloc.__setitem__
with the other Series from the same DataFrame. (#1388)loc/iloc.__setitem__
. (#1391)__setitem__
for loc/iloc with DataFrame. (#1394)We continue to improve loc
indexer and added the slice column selection support (#1351).
>>> from databricks import koalas as ks
>>> df = ks.DataFrame({'a':list('abcdefghij'), 'b':list('abcdefghij'), 'c': range(10)})
>>> df.loc[:, "b":"c"]
b c
0 a 0
1 b 1
2 c 2
3 d 3
4 e 4
5 f 5
6 g 6
7 h 7
8 i 8
9 j 9
We also added the support of slice as row selection in loc
indexer for multi-index (#1344).
>>> from databricks import koalas as ks
>>> import pandas as pd
>>> df = ks.DataFrame({'a': range(3)}, index=pd.MultiIndex.from_tuples([("a", "b"), ("a", "c"), ("b", "d")]))
>>> df.loc[("a", "c"): "b"]
a
a c 1
b d 2
We continued to improve iloc
indexer to support iterable indexes as row selection (#1338).
>>> from databricks import koalas as ks
>>> df = ks.DataFrame({'a':list('abcdefghij'), 'b':list('abcdefghij')})
>>> df.iloc[[-1, 1, 2, 3]]
a b
1 b b
2 c c
3 d d
9 j j
Now, we added the basic support of setting values via loc
and iloc
at Series (#1367).
>>> from databricks import koalas as ks
>>> kser = ks.Series([1, 2, 3], index=["cobra", "viper", "sidewinder"])
>>> kser.loc[kser % 2 == 1] = -kser
>>> kser
cobra -1
viper 2
sidewinder -3
We added the following new feature:
DataFrame:
take
(#1292)eval
(#1359)Series:
dot
(#1136)take
(#1357)combine_first
(#1290)Index:
droplevel
(#1340)union
(#1348)take
(#1357)asof
(#1350)MultiIndex:
droplevel
(#1340)unique
(#1342)union
(#1348)take
(#1357)iloc
We improved iloc
indexer to support slice as row selection. (#1335)
For example,
>>> kdf = ks.DataFrame({'a':list('abcdefghij')})
>>> kdf
a
0 a
1 b
2 c
3 d
4 e
5 f
6 g
7 h
8 i
9 j
>>> kdf.iloc[2:5]
a
2 c
3 d
4 e
>>> kdf.iloc[2:-3:2]
a
2 c
4 e
6 g
>>> kdf.iloc[5:]
a
5 f
6 g
7 h
8 i
9 j
>>> kdf.iloc[5:2]
Empty DataFrame
Columns: [a]
Index: []
We added links to the previous talks in our document. (#1319)
You can see a lot of useful talks from the previous events and we will keep updated.
https://koalas.readthedocs.io/en/latest/getting_started/videos.html
We added the following new feature:
DataFrame:
stack
(#1329)Series:
repeat
(#1328)Index:
difference
(#1325)repeat
(#1328)MultiIndex:
difference
(#1325)repeat
(#1328)We added pandas 1.0 support (#1197, #1299), and Koalas now can work with pandas 1.0.
We implemented DataFrame.map_in_pandas
API (#1276) so Koalas can allow any arbitrary function with pandas DataFrame against Koalas DataFrame. See the example below:
>>> import databricks.koalas as ks
>>> df = ks.DataFrame({'A': range(2000), 'B': range(2000)})
>>> def query_func(pdf):
... num = 1995
... return pdf.query('A > @num')
...
>>> df.map_in_pandas(query_func)
A B
1996 1996 1996
1997 1997 1997
1998 1998 1998
1999 1999 1999
As a development only change, we added Black integration (#1301). Now, all code style is standardized automatically via running ./dev/reformat
, and the style is checked as a part of ./dev/lint-python
.
We added the following new feature:
DataFrame:
query
(#1273)unstack
(#1295)DataFrame.describe()
to support multi-index columns. (#1279)drop_duplicates
(#1303)head
orderingSince Koalas doesn't guarantee the row ordering, head
could return some rows from distributed partition and the result is not deterministic, which might confuse users.
We added a configuration compute.ordered_head
(#1231), and if it is set to True
, Koalas performs natural ordering beforehand and the result will be the same as pandas'.
The default value is False
because the ordering will cause a performance overhead.
>>> kdf = ks.DataFrame({'a': range(10)})
>>> pdf = kdf.to_pandas()
>>> pdf.head(3)
a
0 0
1 1
2 2
>>> kdf.head(3)
a
5 5
6 6
7 7
>>> kdf.head(3)
a
0 0
1 1
2 2
>>> ks.options.compute.ordered_head = True
>>> kdf.head(3)
a
0 0
1 1
2 2
>>> kdf.head(3)
a
0 0
1 1
2 2
We started trying to use GitHub Actions for CI. (#1254, #1265, #1264, #1267, #1269)
We added the following new feature:
DataFrame:
DataFrame/Series.clip
function to preserve its index. (#1232)DataFrame.sort_values
when multi-index column is used (#1238)fillna
not to change index values. (#1241)DataFrame.__setitem__
with tuple-named Series. (#1245)corr
to support multi-index columns. (#1246)print()
matches with pandas of Series (#1250)iat
indexerWe continued to improve indexers. Now, iat
indexer is supported too (#1062).
>>> df = ks.DataFrame([[0, 2, 3], [0, 4, 1], [10, 20, 30]],
... columns=['A', 'B', 'C'])
>>> df
A B C
0 0 2 3
1 0 4 1
2 10 20 30
>>> df.iat[1, 2]
1
We added the following new features:
koalas.Index
equals
(#1216)identical
(#1215)is_all_dates
(#1205)append
(#1163)to_frame
(#1187)koalas.MultiIndex:
equals
(#1216)identical
(#1215)swaplevel
(#1105)is_all_dates
(#1205)is_monotonic_increasing
(#1183)is_monotonic_decreasing
(#1183)append
(#1163)to_frame
(#1187)koalas.DataFrameGroupBy
describe
(#1168)DataFrame.idxmin/idxmax
. (#1198)