Skip to content

Add implementation-specific descriptors #343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 22, 2022
Merged
141 changes: 73 additions & 68 deletions graphblas/core/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def _new(self, updater, expr, *, in_composite=False):
return parent._as_vector()
return

opts = updater.opts
if agg._composite is not None:
# Masks are applied throughout the aggregation, including composite aggregations.
# Aggregations done while `in_composite is True` should return the updater parent
Expand All @@ -173,14 +174,14 @@ def _new(self, updater, expr, *, in_composite=False):
for cur_agg in agg._composite:
cur_agg = cur_agg[self.type] # Hopefully works well enough
arg = expr.construct_output(cur_agg.return_type)
results.append(cur_agg._new(arg(mask=mask), expr, in_composite=True))
final_expr = agg._finalize(*results)
results.append(cur_agg._new(arg(mask=mask, **opts), expr, in_composite=True))
final_expr = agg._finalize(*results, opts)
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
updater << final_expr
elif expr.cfunc_name.startswith("GrB_Vector_reduce") or expr.cfunc_name.startswith(
"GrB_Matrix_reduce"
):
final = final_expr.new()
final = final_expr.new(**opts)
updater << final[0]
else:
raise NotImplementedError(f"{agg.name} with {expr.cfunc_name}")
Expand All @@ -192,7 +193,7 @@ def _new(self, updater, expr, *, in_composite=False):
return

if agg._custom is not None:
return agg._custom(self, updater, expr, in_composite=in_composite)
return agg._custom(self, updater, expr, opts, in_composite=in_composite)

semiring = get_typed_op(agg._semiring, self.type, agg._initdtype)
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
Expand All @@ -201,12 +202,12 @@ def _new(self, updater, expr, *, in_composite=False):
orig_updater = updater
if agg._finalize is not None:
step1 = expr.construct_output(semiring.return_type)
updater = step1(mask=updater.kwargs.get("mask"))
updater = step1(mask=updater.kwargs.get("mask"), **opts)
if expr.method_name == "reduce_columnwise":
A = A.T
size = A._ncols
init = expr._new_vector(agg._initdtype, size=size)
init[...] = agg._initval # O(1) dense vector in SuiteSparse 5
init(**opts)[...] = agg._initval # O(1) dense vector in SuiteSparse 5
if agg._switch:
updater << semiring(init @ A.T)
else:
Expand All @@ -220,17 +221,17 @@ def _new(self, updater, expr, *, in_composite=False):
v = expr.args[0]
step1 = expr._new_vector(semiring.return_type, size=1)
init = expr._new_matrix(agg._initdtype, nrows=v._size, ncols=1)
init[...] = agg._initval # O(1) dense column vector in SuiteSparse 5
init(**opts)[...] = agg._initval # O(1) dense column vector in SuiteSparse 5
if agg._switch:
step1 << semiring(init.T @ v)
step1(**opts) << semiring(init.T @ v)
else:
step1 << semiring(v @ init)
step1(**opts) << semiring(v @ init)
if agg._finalize is not None:
finalize = agg._finalize[semiring.return_type]
if step1.dtype == finalize.return_type:
step1 << finalize(step1)
step1(**opts) << finalize(step1)
else:
step1 = finalize(step1).new(finalize.return_type)
step1 = finalize(step1).new(finalize.return_type, **opts)
if in_composite:
return step1
updater << step1[0]
Expand All @@ -241,23 +242,23 @@ def _new(self, updater, expr, *, in_composite=False):
# This has not been benchmarked or optimized.
# We may be able to intelligently choose the faster path.
init1 = expr._new_vector(agg._initdtype, size=A._ncols)
init1[...] = agg._initval # O(1) dense vector in SuiteSparse 5
init1(**opts)[...] = agg._initval # O(1) dense vector in SuiteSparse 5
step1 = expr._new_vector(semiring.return_type, size=A._nrows)
if agg._switch:
step1 << semiring(init1 @ A.T)
step1(**opts) << semiring(init1 @ A.T)
else:
step1 << semiring(A @ init1)
step1(**opts) << semiring(A @ init1)
init2 = expr._new_matrix(agg._initdtype, nrows=A._nrows, ncols=1)
init2[...] = agg._initval # O(1) dense vector in SuiteSparse 5
init2(**opts)[...] = agg._initval # O(1) dense vector in SuiteSparse 5
semiring2 = agg._semiring2[semiring.return_type]
step2 = expr._new_vector(semiring2.return_type, size=1)
step2 << semiring2(step1 @ init2)
step2(**opts) << semiring2(step1 @ init2)
if agg._finalize is not None:
finalize = agg._finalize[semiring2.return_type]
if step2.dtype == finalize.return_type:
step2 << finalize(step2)
else:
step2 = finalize(step2).new(finalize.return_type)
step2 = finalize(step2).new(finalize.return_type, **opts)
if in_composite:
return step2
updater << step2[0]
Expand Down Expand Up @@ -342,53 +343,53 @@ def __reduce__(self):


# Composite
def _mean_finalize(c, x):
def _mean_finalize(c, x, opts):
return binary.truediv(x & c)


def _ptp_finalize(max, min):
def _ptp_finalize(max, min, opts):
return binary.minus(max & min)


def _varp_finalize(c, x, x2):
def _varp_finalize(c, x, x2, opts):
# <x2> / n - (<x> / n)**2
left = binary.truediv(x2 & c).new()
right = binary.truediv(x & c).new()
right << binary.pow(right, 2)
left = binary.truediv(x2 & c).new(**opts)
right = binary.truediv(x & c).new(**opts)
right(**opts) << binary.pow(right, 2)
return binary.minus(left & right)


def _vars_finalize(c, x, x2):
def _vars_finalize(c, x, x2, opts):
# <x2> / (n-1) - <x>**2 / (n * (n-1))
x << binary.pow(x, 2)
right = binary.truediv(x & c).new()
c << binary.minus(c, 1)
right << binary.truediv(right & c)
left = binary.truediv(x2 & c).new()
x(**opts) << binary.pow(x, 2)
right = binary.truediv(x & c).new(**opts)
c(**opts) << binary.minus(c, 1)
right(**opts) << binary.truediv(right & c)
left = binary.truediv(x2 & c).new(**opts)
return binary.minus(left & right)


def _stdp_finalize(c, x, x2):
val = _varp_finalize(c, x, x2).new()
def _stdp_finalize(c, x, x2, opts):
val = _varp_finalize(c, x, x2, opts).new(**opts)
return unary.sqrt(val)


def _stds_finalize(c, x, x2):
val = _vars_finalize(c, x, x2).new()
def _stds_finalize(c, x, x2, opts):
val = _vars_finalize(c, x, x2, opts).new(**opts)
return unary.sqrt(val)


def _geometric_mean_finalize(c, x):
right = unary.minv["FP64"](c).new()
def _geometric_mean_finalize(c, x, opts):
right = unary.minv["FP64"](c).new(**opts)
return binary.pow(x & right)


def _harmonic_mean_finalize(c, x):
def _harmonic_mean_finalize(c, x, opts):
return binary.truediv(c & x)


def _root_mean_square_finalize(c, x2):
val = binary.truediv(x2 & c).new()
def _root_mean_square_finalize(c, x2, opts):
val = binary.truediv(x2 & c).new(**opts)
return unary.sqrt(val)


Expand Down Expand Up @@ -453,6 +454,7 @@ def _argminmaxij(
agg,
updater,
expr,
opts,
*,
in_composite,
monoid,
Expand All @@ -462,51 +464,52 @@ def _argminmaxij(
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
A = expr.args[0]
if expr.method_name == "reduce_rowwise":
step1 = A.reduce_rowwise(monoid).new()
step1 = A.reduce_rowwise(monoid).new(**opts)

D = step1.diag()

masked = semiring.any_eq(D @ A).new()
masked(mask=masked.V, replace=True) << masked # Could use select
masked = semiring.any_eq(D @ A).new(**opts)
masked(mask=masked.V, replace=True, **opts) << masked # Could use select
init = expr._new_vector(bool, size=A._ncols)
init[...] = False # O(1) dense vector in SuiteSparse 5
init(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
updater << row_semiring(masked @ init)
if in_composite:
return updater.parent
else:
step1 = A.reduce_columnwise(monoid).new()
step1 = A.reduce_columnwise(monoid).new(**opts)

D = step1.diag()

masked = semiring.any_eq(A @ D).new()
masked(mask=masked.V, replace=True) << masked # Could use select
masked = semiring.any_eq(A @ D).new(**opts)
masked(mask=masked.V, replace=True, **opts) << masked # Could use select
init = expr._new_vector(bool, size=A._nrows)
init[...] = False # O(1) dense vector in SuiteSparse 5
init(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
updater << col_semiring(init @ masked)
if in_composite:
return updater.parent
elif expr.cfunc_name.startswith("GrB_Vector_reduce"):
v = expr.args[0]
step1 = v.reduce(monoid, allow_empty=False).new()
masked = binary.eq(v, step1).new()
masked(mask=masked.V, replace=True) << masked # Could use select
step1 = v.reduce(monoid, allow_empty=False).new(**opts)
masked = binary.eq(v, step1).new(**opts)
masked(mask=masked.V, replace=True, **opts) << masked # Could use select
init = expr._new_matrix(bool, nrows=v._size, ncols=1)
init[...] = False # O(1) dense column vector in SuiteSparse 5
step2 = col_semiring(masked @ init).new()
init(**opts)[...] = False # O(1) dense column vector in SuiteSparse 5
step2 = col_semiring(masked @ init).new(**opts)
if in_composite:
return step2
updater << step2[0]
else:
raise NotImplementedError(f"{agg.name} with {expr.cfunc_name}")


def _argminmax(agg, updater, expr, *, in_composite, monoid):
def _argminmax(agg, updater, expr, opts, *, in_composite, monoid):
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
if expr.method_name == "reduce_rowwise":
return _argminmaxij(
agg,
updater,
expr,
opts,
in_composite=in_composite,
monoid=monoid,
row_semiring=semiring.min_firstj,
Expand All @@ -516,6 +519,7 @@ def _argminmax(agg, updater, expr, *, in_composite, monoid):
agg,
updater,
expr,
opts,
in_composite=in_composite,
monoid=monoid,
row_semiring=semiring.min_firsti,
Expand All @@ -526,6 +530,7 @@ def _argminmax(agg, updater, expr, *, in_composite, monoid):
agg,
updater,
expr,
opts,
in_composite=in_composite,
monoid=monoid,
row_semiring=semiring.min_firsti,
Expand All @@ -549,52 +554,52 @@ def _argminmax(agg, updater, expr, *, in_composite, monoid):
)


def _first_last(agg, updater, expr, *, in_composite, semiring_):
def _first_last(agg, updater, expr, opts, *, in_composite, semiring_):
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
A = expr.args[0]
if expr.method_name == "reduce_columnwise":
A = A.T
init = expr._new_vector(bool, size=A._ncols)
init[...] = False # O(1) dense vector in SuiteSparse 5
step1 = semiring_(A @ init).new()
init(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
step1 = semiring_(A @ init).new(**opts)
Is, Js = step1.to_coo()

Matrix_ = type(expr._new_matrix(bool))
P = Matrix_.from_coo(Js, Is, 1, nrows=A._ncols, ncols=A._nrows)
mask = step1.diag()
result = semiring.any_first(A @ P).new(mask=mask.S).diag()
result = semiring.any_first(A @ P).new(mask=mask.S, **opts).diag(**opts)

updater << result
if in_composite:
return updater.parent
elif expr.cfunc_name.startswith("GrB_Vector_reduce"):
v = expr.args[0]
init = expr._new_matrix(bool, nrows=v._size, ncols=1)
init[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring_(v @ init).new()
init(**opts)[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring_(v @ init).new(**opts)
index = step1[0].new().value
# `==` instead of `is` automatically triggers index.compute() in dask-graphblas:
if index == None: # noqa
index = 0
if in_composite:
return v[[index]].new()
return v[[index]].new(**opts)
updater << v[index]
else: # GrB_Matrix_reduce
A = expr.args[0]
init1 = expr._new_matrix(bool, nrows=A._ncols, ncols=1)
init1[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring_(A @ init1).new()
init1(**opts)[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring_(A @ init1).new(**opts)
init2 = expr._new_vector(bool, size=A._nrows)
init2[...] = False # O(1) dense vector in SuiteSparse 5
step2 = semiring_(step1.T @ init2).new()
init2(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
step2 = semiring_(step1.T @ init2).new(**opts)
i = step2[0].new().value
# `==` instead of `is` automatically triggers i.compute() in dask-graphblas:
if i == None: # noqa
i = j = 0
else:
j = step1[i, 0].new().value
if in_composite:
return A[i, [j]].new()
return A[i, [j]].new(**opts)
updater << A[i, j]


Expand All @@ -612,22 +617,22 @@ def _first_last(agg, updater, expr, *, in_composite, semiring_):
)


def _first_last_index(agg, updater, expr, *, in_composite, semiring):
def _first_last_index(agg, updater, expr, opts, *, in_composite, semiring):
if expr.cfunc_name == "GrB_Matrix_reduce_Aggregator":
A = expr.args[0]
if expr.method_name == "reduce_columnwise":
A = A.T
init = expr._new_vector(bool, size=A._ncols)
init[...] = False # O(1) dense vector in SuiteSparse 5
init(**opts)[...] = False # O(1) dense vector in SuiteSparse 5
expr = semiring(A @ init)
updater << expr
if in_composite:
return updater.parent
elif expr.cfunc_name.startswith("GrB_Vector_reduce"):
v = expr.args[0]
init = expr._new_matrix(bool, nrows=v._size, ncols=1)
init[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring(v @ init).new()
init(**opts)[...] = False # O(1) dense matrix in SuiteSparse 5
step1 = semiring(v @ init).new(**opts)
if in_composite:
return step1
updater << step1[0]
Expand Down
Loading