aboutsummaryrefslogtreecommitdiff
path: root/src/dctv/query_execution.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dctv/query_execution.cpp')
-rw-r--r--src/dctv/query_execution.cpp393
1 files changed, 393 insertions, 0 deletions
diff --git a/src/dctv/query_execution.cpp b/src/dctv/query_execution.cpp
new file mode 100644
index 0000000..9b97f2b
--- /dev/null
+++ b/src/dctv/query_execution.cpp
@@ -0,0 +1,393 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include "query_execution.h"
+
+#include "awaitable_qe_call.h"
+#include "input_channel.h"
+#include "io_spec.h"
+#include "operator_context.h"
+#include "output_channel.h"
+#include "pyerrfmt.h"
+#include "pyiter.h"
+#include "pylog.h"
+#include "pyobj.h"
+#include "pyparsetuple.h"
+#include "query_cache.h"
+#include "string_table.h"
+#include "pyseq.h"
+
+namespace dctv {
+
+static
+unique_obj_pyref<QueryExecution>
+py_make_query_execution(PyObject*, pyref args, pyref kwargs)
+{
+ PARSEPYARGS(
+ (pyref, plan)
+ (OPTIONAL_ARGS_FOLLOW)
+ (KWONLY_ARGS_FOLLOW)
+ (pyref, env)
+ (pyref, progress_callback)
+ (pyref, qc)
+ (pyref, perf_callback)
+ )(args, kwargs);
+ QueryExecution::Config config;
+ config.plan = py2vec_fast(
+ plan,
+ [](pyref query_action) { return query_action.notnull().addref(); });
+ config.env = env
+ ? env.addref()
+ : make_pydict();
+ config.progress_callback = progress_callback
+ ? progress_callback.addref()
+ : addref(Py_None);
+ config.qc = qc
+ ? qc.addref_as<QueryCache>()
+ : make_pyobj<QueryCache>(QueryCacheConfig());
+ config.perf_callback = perf_callback && perf_callback != Py_None
+ ? perf_callback.addref()
+ : unique_pyref();
+ return QueryExecution::make(std::move(config));
+}
+
+unique_obj_pyref<QueryExecution>
+QueryExecution::make(Config config)
+{
+ // Grab plan from config before moving the rest of config to QE
+ Vector<unique_pyref> plan = std::move(config.plan);
+ unique_obj_pyref<QueryExecution> qe =
+ make_pyobj<QueryExecution>(std::move(config));
+ QueryDb query_db;
+ int ordinal = 1; // Not zero, so we can detect init.
+ for (unique_pyref& query_action : plan)
+ OperatorContext::install(qe.get(),
+ ordinal++,
+ std::move(query_action),
+ &query_db);
+ return qe;
+}
+
+QueryExecution::QueryExecution(Config config)
+ : block_size(config.qc->get_block_size()),
+ qc(std::move(config.qc)),
+ env(std::move(config.env)),
+ progress_callback(std::move(config.progress_callback))
+{
+ // make() consumes config.plan
+ assume(this->qc);
+ assume(this->env);
+ if (config.perf_callback)
+ this->perf_info.emplace(PerfInfo{std::move(config.perf_callback)});
+ this->qc->note_execution_in_progress(this);
+}
+
+QueryExecution::~QueryExecution() noexcept
+{
+ this->py_clear();
+}
+
+void
+QueryExecution::flush_pending_score_recomputations()
+{
+ while (!this->re_score_queue.empty()) {
+ OperatorContext* op = &this->re_score_queue.front();
+ assume(op->link_state == LinkState::ON_RUN_QUEUE_NEED_RESCORE);
+ assume(op->link_qe == this);
+ assume(op->link_ref == op);
+ assume(op->link_score.is_valid());
+ Score score = op->compute_score();
+ if (score != op->link_score) {
+ auto it = this->run_queue.iterator_to(*op);
+ assume(it != this->run_queue.end());
+ this->run_queue.erase(it);
+ op->link_score = score;
+ auto [_, inserted] = this->run_queue.insert(*op);
+ assume(inserted);
+ }
+ op->link_state = LinkState::ON_RUN_QUEUE_VALID_SCORE;
+ this->re_score_queue.pop_front();
+ }
+}
+
+unique_pyref
+QueryExecution::next()
+{
+ if (this->current_operator)
+ throw_pyerr_msg(PyExc_RuntimeError,
+ "recursive query execution not allowed");
+ for (;;) {
+ this->flush_pending_score_recomputations();
+ if (this->run_queue.empty())
+ break;
+ OperatorContext* oc = &*this->run_queue.rbegin();
+ assume(this->current_operator == nullptr);
+ this->current_operator = oc;
+ FINALLY(this->current_operator = nullptr);
+ if (unique_pyref yield_value = oc->turn_crank(); yield_value)
+ return yield_value;
+ }
+ return {}; // Terminate iteration
+}
+
+void
+QueryExecution::add_operator(unique_op_ref op)
+{
+ assume(op);
+ assume(op->link_state == LinkState::ORPHANED);
+ assume(!op->link_score.is_valid());
+ assume(!op->link_qe);
+ assume(!op->link_ref);
+ op->link_score = op->compute_score();
+ // We don't throw after this point.
+ assume(op->link_score.is_valid());
+ auto [it, inserted] = this->run_queue.insert(*op);
+ assume(inserted);
+ op->link_qe = this;
+ op->link_state = LinkState::ON_RUN_QUEUE_VALID_SCORE;
+ op->link_ref = std::move(op); // Must be last.
+}
+
+void
+QueryExecution::remove_operator_on_close(OperatorContext* oc) noexcept
+{
+ assume(oc->link_state != LinkState::ORPHANED);
+ assume(oc->link_qe == this);
+ assume(oc->link_ref == oc);
+ if (oc->link_state == LinkState::ON_RUN_QUEUE_NEED_RESCORE)
+ this->remove_from_re_score_queue(oc);
+ assume(oc->link_state == LinkState::ON_RUN_QUEUE_VALID_SCORE);
+ auto it = this->run_queue.iterator_to(*oc);
+ assume(it != this->run_queue.end());
+ this->run_queue.erase(it);
+ assume(oc->link_ref == oc);
+ oc->link_state = LinkState::ORPHANED;
+ oc->link_qe = nullptr;
+ oc->link_ref.reset(); // May be the last reference and delete oc!
+}
+
+void
+QueryExecution::add_to_re_score_queue(OperatorContext* oc) noexcept
+{
+ assume(oc->link_state == LinkState::ON_RUN_QUEUE_VALID_SCORE);
+ assume(oc->link_qe == this);
+ this->re_score_queue.push_front(*oc);
+ oc->link_state = LinkState::ON_RUN_QUEUE_NEED_RESCORE;
+}
+
+void
+QueryExecution::remove_from_re_score_queue(OperatorContext* oc) noexcept
+{
+ assume(oc->link_state == LinkState::ON_RUN_QUEUE_NEED_RESCORE);
+ auto it = this->re_score_queue.iterator_to(*oc);
+ assume(it != this->re_score_queue.end());
+ this->re_score_queue.erase(it);
+ oc->link_state = LinkState::ON_RUN_QUEUE_VALID_SCORE;
+}
+
+static
+unique_pyref
+async_yield_to_query_runner(PyObject*, pyref what)
+{
+ return make_qe_call_yield_to_query_runner(what.addref());
+}
+
+static
+unique_pyref
+async_io(PyObject*, pyref args)
+{
+ return make_qe_call_io_multi(args.as_unsafe<PyTupleObject>());
+}
+
+static
+unique_pyref
+async_dummy(PyObject*)
+{
+ return make_qe_call_io_single(IoDummy());
+}
+
+static
+unique_pyref
+async_setup(PyObject*, pyref args)
+{
+ return make_qe_call_setup(args.as_unsafe<PyTupleObject>());
+}
+
+int
+QueryExecution::py_traverse(visitproc visit, void* arg) const noexcept
+{
+ // re_score_queue is a subset of run_queue, so there's no need to scan it
+ for (const OperatorContext& oc : this->run_queue) {
+ assume(oc.link_ref.get() == &oc);
+ Py_VISIT(oc.link_ref.get());
+ }
+ Py_VISIT(this->qc.get());
+ Py_VISIT(this->env.get());
+ Py_VISIT(this->progress_callback.get());
+ if (this->perf_info)
+ Py_VISIT(this->perf_info->callback.get());
+ return 0;
+}
+
+int
+QueryExecution::py_clear() noexcept
+{
+ if (this->qc) {
+ this->qc->note_execution_done(this);
+ this->qc.reset();
+ }
+
+ // It shouldn't be possible to get into an isolated reference cycle
+ // or destroy the QE while we're processing a query.
+ assume(!this->current_operator);
+
+ // Closing an operator context removes it from all queues.
+ while (!this->run_queue.empty())
+ this->run_queue.root()->close();
+ assert(this->re_score_queue.empty());
+
+ this->env.reset();
+ this->progress_callback.reset();
+ this->perf_info.reset();
+ return 0;
+}
+
+void
+QueryExecution::accumulate_perf(
+ pyref query_action,
+ const perf::Sample& accumulated_perf)
+{
+ if (this->perf_info)
+ call(this->perf_info->callback,
+ query_action.notnull().addref(),
+ accumulated_perf.to_py());
+}
+
+QueryExecution*
+query_execution_check(PyObject* ref)
+{
+ return pyref(ref).as<QueryExecution>().get();
+}
+
+unique_pyarray
+make_uninit_hunk_array(obj_pyref<QueryExecution> qe,
+ unique_dtype dtype,
+ npy_intp nelem)
+{
+ return make_uninit_hunk_array(qe->get_qc(), std::move(dtype), nelem);
+}
+
+StringTable*
+st_from_qe(QueryExecution* qe)
+{
+ return qe->get_qc()->get_st();
+}
+
+static
+unique_pyref
+QueryExecution_get_current_action(PyObject* self, void*)
+{
+ OperatorContext* oc = pyref(self).as_unsafe<QueryExecution>()
+ ->get_current_operator();
+ if (!oc)
+ throw_pyerr_msg(PyExc_RuntimeError, "no current action");
+ return addref(oc->get_query_action());
+}
+
+static
+unique_pyref
+QueryExecution_get_st(PyObject* self, void*)
+{
+ return xaddref(pyref(self).as_unsafe<QueryExecution>()
+ ->get_qc()->get_st());
+}
+
+PyGetSetDef QueryExecution::pygetset[] = {
+ make_getset("current_action",
+ "Currently-executing QueryAction",
+ wraperr<QueryExecution_get_current_action>()),
+ make_getset("st",
+ "Get the string table for this execution",
+ wraperr<QueryExecution_get_st>()),
+ { 0 },
+};
+
+PyMethodDef QueryExecution::pymethods[] = {
+ make_methoddef("async_yield_to_query_runner",
+ wraperr<&async_yield_to_query_runner>(),
+ METH_STATIC | METH_O,
+ "For operator: suspend query execution; deliver value"),
+ make_methoddef("async_setup",
+ wraperr<&async_setup>(),
+ METH_STATIC | METH_VARARGS,
+ "Handshake with query executor on operator creation"),
+ make_methoddef("async_io",
+ wraperr<&async_io>(),
+ METH_STATIC | METH_VARARGS,
+ "Perform IO for query execution"),
+ make_methoddef("async_dummy",
+ wraperr<&async_dummy>(),
+ METH_STATIC | METH_NOARGS,
+ "Dummy async IO operation to simplify Python"),
+ make_methoddef("make",
+ wraperr<py_make_query_execution>(),
+ METH_STATIC | METH_VARARGS | METH_KEYWORDS,
+ "Create a new QueryExecution"),
+ { 0 },
+};
+
+PyMemberDef QueryExecution::pymembers[] = {
+ make_memberdef("env",
+ T_OBJECT,
+ (offsetof(QueryExecution, env) +
+ unique_pyref::get_pyval_offset()),
+ READONLY,
+ "Caller-specified query environment"),
+ make_memberdef("qc",
+ T_OBJECT,
+ (offsetof(QueryExecution, qc) +
+ unique_pyref::get_pyval_offset()),
+ READONLY,
+ "Query cache used by the current query execution"),
+ make_memberdef("progress_callback",
+ T_OBJECT,
+ (offsetof(QueryExecution, progress_callback) +
+ unique_pyref::get_pyval_offset()),
+ READONLY,
+ "Progress callback or None"),
+ make_memberdef("block_size",
+ T_PYSSIZET,
+ offsetof(QueryExecution, block_size),
+ READONLY,
+ "Preferred block size"),
+ { 0 },
+};
+
+PyTypeObject QueryExecution::pytype = make_py_type<QueryExecution>(
+ "dctv._native.QueryExecution",
+ "Query execution context",
+ [](PyTypeObject* t) {
+ t->tp_iter = return_self;
+ t->tp_iternext = wraperr<&QueryExecution::next>();
+ t->tp_getset = QueryExecution::pygetset;
+ t->tp_members = QueryExecution::pymembers;
+ t->tp_methods = QueryExecution::pymethods;
+ });
+
+void
+init_query_execution(pyref m)
+{
+ register_type(m, &QueryExecution::pytype);
+}
+
+} // namespace dctv