diff options
Diffstat (limited to 'src/dctv/query_execution.cpp')
-rw-r--r-- | src/dctv/query_execution.cpp | 393 |
1 files changed, 393 insertions, 0 deletions
diff --git a/src/dctv/query_execution.cpp b/src/dctv/query_execution.cpp new file mode 100644 index 0000000..9b97f2b --- /dev/null +++ b/src/dctv/query_execution.cpp @@ -0,0 +1,393 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "query_execution.h" + +#include "awaitable_qe_call.h" +#include "input_channel.h" +#include "io_spec.h" +#include "operator_context.h" +#include "output_channel.h" +#include "pyerrfmt.h" +#include "pyiter.h" +#include "pylog.h" +#include "pyobj.h" +#include "pyparsetuple.h" +#include "query_cache.h" +#include "string_table.h" +#include "pyseq.h" + +namespace dctv { + +static +unique_obj_pyref<QueryExecution> +py_make_query_execution(PyObject*, pyref args, pyref kwargs) +{ + PARSEPYARGS( + (pyref, plan) + (OPTIONAL_ARGS_FOLLOW) + (KWONLY_ARGS_FOLLOW) + (pyref, env) + (pyref, progress_callback) + (pyref, qc) + (pyref, perf_callback) + )(args, kwargs); + QueryExecution::Config config; + config.plan = py2vec_fast( + plan, + [](pyref query_action) { return query_action.notnull().addref(); }); + config.env = env + ? env.addref() + : make_pydict(); + config.progress_callback = progress_callback + ? progress_callback.addref() + : addref(Py_None); + config.qc = qc + ? qc.addref_as<QueryCache>() + : make_pyobj<QueryCache>(QueryCacheConfig()); + config.perf_callback = perf_callback && perf_callback != Py_None + ? perf_callback.addref() + : unique_pyref(); + return QueryExecution::make(std::move(config)); +} + +unique_obj_pyref<QueryExecution> +QueryExecution::make(Config config) +{ + // Grab plan from config before moving the rest of config to QE + Vector<unique_pyref> plan = std::move(config.plan); + unique_obj_pyref<QueryExecution> qe = + make_pyobj<QueryExecution>(std::move(config)); + QueryDb query_db; + int ordinal = 1; // Not zero, so we can detect init. + for (unique_pyref& query_action : plan) + OperatorContext::install(qe.get(), + ordinal++, + std::move(query_action), + &query_db); + return qe; +} + +QueryExecution::QueryExecution(Config config) + : block_size(config.qc->get_block_size()), + qc(std::move(config.qc)), + env(std::move(config.env)), + progress_callback(std::move(config.progress_callback)) +{ + // make() consumes config.plan + assume(this->qc); + assume(this->env); + if (config.perf_callback) + this->perf_info.emplace(PerfInfo{std::move(config.perf_callback)}); + this->qc->note_execution_in_progress(this); +} + +QueryExecution::~QueryExecution() noexcept +{ + this->py_clear(); +} + +void +QueryExecution::flush_pending_score_recomputations() +{ + while (!this->re_score_queue.empty()) { + OperatorContext* op = &this->re_score_queue.front(); + assume(op->link_state == LinkState::ON_RUN_QUEUE_NEED_RESCORE); + assume(op->link_qe == this); + assume(op->link_ref == op); + assume(op->link_score.is_valid()); + Score score = op->compute_score(); + if (score != op->link_score) { + auto it = this->run_queue.iterator_to(*op); + assume(it != this->run_queue.end()); + this->run_queue.erase(it); + op->link_score = score; + auto [_, inserted] = this->run_queue.insert(*op); + assume(inserted); + } + op->link_state = LinkState::ON_RUN_QUEUE_VALID_SCORE; + this->re_score_queue.pop_front(); + } +} + +unique_pyref +QueryExecution::next() +{ + if (this->current_operator) + throw_pyerr_msg(PyExc_RuntimeError, + "recursive query execution not allowed"); + for (;;) { + this->flush_pending_score_recomputations(); + if (this->run_queue.empty()) + break; + OperatorContext* oc = &*this->run_queue.rbegin(); + assume(this->current_operator == nullptr); + this->current_operator = oc; + FINALLY(this->current_operator = nullptr); + if (unique_pyref yield_value = oc->turn_crank(); yield_value) + return yield_value; + } + return {}; // Terminate iteration +} + +void +QueryExecution::add_operator(unique_op_ref op) +{ + assume(op); + assume(op->link_state == LinkState::ORPHANED); + assume(!op->link_score.is_valid()); + assume(!op->link_qe); + assume(!op->link_ref); + op->link_score = op->compute_score(); + // We don't throw after this point. + assume(op->link_score.is_valid()); + auto [it, inserted] = this->run_queue.insert(*op); + assume(inserted); + op->link_qe = this; + op->link_state = LinkState::ON_RUN_QUEUE_VALID_SCORE; + op->link_ref = std::move(op); // Must be last. +} + +void +QueryExecution::remove_operator_on_close(OperatorContext* oc) noexcept +{ + assume(oc->link_state != LinkState::ORPHANED); + assume(oc->link_qe == this); + assume(oc->link_ref == oc); + if (oc->link_state == LinkState::ON_RUN_QUEUE_NEED_RESCORE) + this->remove_from_re_score_queue(oc); + assume(oc->link_state == LinkState::ON_RUN_QUEUE_VALID_SCORE); + auto it = this->run_queue.iterator_to(*oc); + assume(it != this->run_queue.end()); + this->run_queue.erase(it); + assume(oc->link_ref == oc); + oc->link_state = LinkState::ORPHANED; + oc->link_qe = nullptr; + oc->link_ref.reset(); // May be the last reference and delete oc! +} + +void +QueryExecution::add_to_re_score_queue(OperatorContext* oc) noexcept +{ + assume(oc->link_state == LinkState::ON_RUN_QUEUE_VALID_SCORE); + assume(oc->link_qe == this); + this->re_score_queue.push_front(*oc); + oc->link_state = LinkState::ON_RUN_QUEUE_NEED_RESCORE; +} + +void +QueryExecution::remove_from_re_score_queue(OperatorContext* oc) noexcept +{ + assume(oc->link_state == LinkState::ON_RUN_QUEUE_NEED_RESCORE); + auto it = this->re_score_queue.iterator_to(*oc); + assume(it != this->re_score_queue.end()); + this->re_score_queue.erase(it); + oc->link_state = LinkState::ON_RUN_QUEUE_VALID_SCORE; +} + +static +unique_pyref +async_yield_to_query_runner(PyObject*, pyref what) +{ + return make_qe_call_yield_to_query_runner(what.addref()); +} + +static +unique_pyref +async_io(PyObject*, pyref args) +{ + return make_qe_call_io_multi(args.as_unsafe<PyTupleObject>()); +} + +static +unique_pyref +async_dummy(PyObject*) +{ + return make_qe_call_io_single(IoDummy()); +} + +static +unique_pyref +async_setup(PyObject*, pyref args) +{ + return make_qe_call_setup(args.as_unsafe<PyTupleObject>()); +} + +int +QueryExecution::py_traverse(visitproc visit, void* arg) const noexcept +{ + // re_score_queue is a subset of run_queue, so there's no need to scan it + for (const OperatorContext& oc : this->run_queue) { + assume(oc.link_ref.get() == &oc); + Py_VISIT(oc.link_ref.get()); + } + Py_VISIT(this->qc.get()); + Py_VISIT(this->env.get()); + Py_VISIT(this->progress_callback.get()); + if (this->perf_info) + Py_VISIT(this->perf_info->callback.get()); + return 0; +} + +int +QueryExecution::py_clear() noexcept +{ + if (this->qc) { + this->qc->note_execution_done(this); + this->qc.reset(); + } + + // It shouldn't be possible to get into an isolated reference cycle + // or destroy the QE while we're processing a query. + assume(!this->current_operator); + + // Closing an operator context removes it from all queues. + while (!this->run_queue.empty()) + this->run_queue.root()->close(); + assert(this->re_score_queue.empty()); + + this->env.reset(); + this->progress_callback.reset(); + this->perf_info.reset(); + return 0; +} + +void +QueryExecution::accumulate_perf( + pyref query_action, + const perf::Sample& accumulated_perf) +{ + if (this->perf_info) + call(this->perf_info->callback, + query_action.notnull().addref(), + accumulated_perf.to_py()); +} + +QueryExecution* +query_execution_check(PyObject* ref) +{ + return pyref(ref).as<QueryExecution>().get(); +} + +unique_pyarray +make_uninit_hunk_array(obj_pyref<QueryExecution> qe, + unique_dtype dtype, + npy_intp nelem) +{ + return make_uninit_hunk_array(qe->get_qc(), std::move(dtype), nelem); +} + +StringTable* +st_from_qe(QueryExecution* qe) +{ + return qe->get_qc()->get_st(); +} + +static +unique_pyref +QueryExecution_get_current_action(PyObject* self, void*) +{ + OperatorContext* oc = pyref(self).as_unsafe<QueryExecution>() + ->get_current_operator(); + if (!oc) + throw_pyerr_msg(PyExc_RuntimeError, "no current action"); + return addref(oc->get_query_action()); +} + +static +unique_pyref +QueryExecution_get_st(PyObject* self, void*) +{ + return xaddref(pyref(self).as_unsafe<QueryExecution>() + ->get_qc()->get_st()); +} + +PyGetSetDef QueryExecution::pygetset[] = { + make_getset("current_action", + "Currently-executing QueryAction", + wraperr<QueryExecution_get_current_action>()), + make_getset("st", + "Get the string table for this execution", + wraperr<QueryExecution_get_st>()), + { 0 }, +}; + +PyMethodDef QueryExecution::pymethods[] = { + make_methoddef("async_yield_to_query_runner", + wraperr<&async_yield_to_query_runner>(), + METH_STATIC | METH_O, + "For operator: suspend query execution; deliver value"), + make_methoddef("async_setup", + wraperr<&async_setup>(), + METH_STATIC | METH_VARARGS, + "Handshake with query executor on operator creation"), + make_methoddef("async_io", + wraperr<&async_io>(), + METH_STATIC | METH_VARARGS, + "Perform IO for query execution"), + make_methoddef("async_dummy", + wraperr<&async_dummy>(), + METH_STATIC | METH_NOARGS, + "Dummy async IO operation to simplify Python"), + make_methoddef("make", + wraperr<py_make_query_execution>(), + METH_STATIC | METH_VARARGS | METH_KEYWORDS, + "Create a new QueryExecution"), + { 0 }, +}; + +PyMemberDef QueryExecution::pymembers[] = { + make_memberdef("env", + T_OBJECT, + (offsetof(QueryExecution, env) + + unique_pyref::get_pyval_offset()), + READONLY, + "Caller-specified query environment"), + make_memberdef("qc", + T_OBJECT, + (offsetof(QueryExecution, qc) + + unique_pyref::get_pyval_offset()), + READONLY, + "Query cache used by the current query execution"), + make_memberdef("progress_callback", + T_OBJECT, + (offsetof(QueryExecution, progress_callback) + + unique_pyref::get_pyval_offset()), + READONLY, + "Progress callback or None"), + make_memberdef("block_size", + T_PYSSIZET, + offsetof(QueryExecution, block_size), + READONLY, + "Preferred block size"), + { 0 }, +}; + +PyTypeObject QueryExecution::pytype = make_py_type<QueryExecution>( + "dctv._native.QueryExecution", + "Query execution context", + [](PyTypeObject* t) { + t->tp_iter = return_self; + t->tp_iternext = wraperr<&QueryExecution::next>(); + t->tp_getset = QueryExecution::pygetset; + t->tp_members = QueryExecution::pymembers; + t->tp_methods = QueryExecution::pymethods; + }); + +void +init_query_execution(pyref m) +{ + register_type(m, &QueryExecution::pytype); +} + +} // namespace dctv |