Scan Class — Scan
¶
Scan
¶
The Scan object is used to return all the records in a specified set (which can be ommitted or
None
). A Scan with aNone
set returns all the records in the namespace.The scan is invoked using
foreach()
,results()
, orexecute_background()
. The bins returned can be filtered usingselect()
.See also
Scans and Managing Scans.
Scan Methods¶
- class aerospike.Scan¶
- select(bin1[, bin2[, bin3..]])¶
Set a filter on the record bins resulting from
results()
orforeach()
. If a selected bin does not exist in a record it will not appear in the bins portion of that record tuple.
- apply(module, function[, arguments])¶
Apply a record UDF to each record found by the scan UDF.
- Parameters
module (str) – the name of the Lua module.
function (str) – the name of the Lua function within the module.
arguments (list) – optional arguments to pass to the function. NOTE: these arguments must be types supported by Aerospike See: supported data types. If you need to use an unsuported type, (e.g. set or tuple) you can use a serializer such as pickle first.
- Returns
one of the supported types,
int
,str
,float
(double),list
,dict
(map),bytearray
(bytes),bool
.
See also
- add_ops(ops)¶
Add a list of write ops to the scan. When used with
Scan.execute_background()
the scan will perform the write ops on any records found. If no predicate is attached to the scan it will apply ops to all the records in the specified set. Seeaerospike_helpers
for available ops.- Parameters
ops – list A list of write operations generated by the aerospike_helpers e.g. list_operations, map_operations, etc.
Note
Requires server version >= 4.7.0.
import aerospike from aerospike_helpers.operations import list_operations from aerospike_helpers.operations import operations scan = client.scan('test', 'demo') ops = [ operations.append(test_bin, 'val_to_append'), list_operations.list_remove_by_index(test_bin, list_index_to_remove, aerospike.LIST_RETURN_NONE) ] scan.add_ops(ops) id = scan.execute_background() client.close()
For a more comprehensive example, see using a list of write ops with
Query.execute_background()
.
- results([policy[, nodename]]) -> list of (key, meta, bins)¶
Buffer the records resulting from the scan, and return them as a
list
of records.- Parameters
policy (dict) – optional Scan Policies.
nodename (str) – optional Node ID of node used to limit the scan to a single node.
- Returns
a
list
of Record Tuple.
import aerospike import pprint pp = pprint.PrettyPrinter(indent=2) config = { 'hosts': [ ('127.0.0.1',3000)]} client = aerospike.client(config).connect() client.put(('test','test','key1'), {'id':1,'a':1}, policy={'key':aerospike.POLICY_KEY_SEND}) client.put(('test','test','key2'), {'id':2,'b':2}, policy={'key':aerospike.POLICY_KEY_SEND}) scan = client.scan('test', 'test') scan.select('id','a','zzz') res = scan.results() pp.pprint(res) client.close()
Note
We expect to see:
[ ( ( 'test', 'test', u'key2', bytearray(b'\xb2\x18\n\xd4\xce\xd8\xba:\x96s\xf5\x9ba\xf1j\xa7t\xeem\x01')), { 'gen': 52, 'ttl': 2592000}, { 'id': 2}), ( ( 'test', 'test', u'key1', bytearray(b'\x1cJ\xce\xa7\xd4Vj\xef+\xdf@W\xa5\xd8o\x8d:\xc9\xf4\xde')), { 'gen': 52, 'ttl': 2592000}, { 'a': 1, 'id': 1})]
Note
Python client versions >= 3.10.0 Supports predicate expressions for results, foreach, and execute_background see
predexp
. Requires server version >= 4.7.0.import aerospike from aerospike import predexp from aerospike import exception as ex import sys import time config = { 'hosts': [('127.0.0.1', 3000)]} client = aerospike.client(config).connect() # register udf try: client.udf_put('/path/to/my_udf.lua') except ex.AerospikeError as e: print("Error: {0} [{1}]".format(e.msg, e.code)) client.close() sys.exit(1) # put records and run scan try: keys = [('test', 'demo', 1), ('test', 'demo', 2), ('test', 'demo', 3)] records = [{'number': 1}, {'number': 2}, {'number': 3}] for i in range(3): client.put(keys[i], records[i]) scan = client.scan('test', 'demo') preds = [ # check that the record has value < 2 or value == 3 in bin 'name' predexp.integer_bin('number'), predexp.integer_value(2), predexp.integer_less(), predexp.integer_bin('number'), predexp.integer_value(3), predexp.integer_equal(), predexp.predexp_or(2) ] policy = { 'predexp': preds } records = scan.results(policy) print(records) except ex.AerospikeError as e: print("Error: {0} [{1}]".format(e.msg, e.code)) sys.exit(1) finally: client.close() # the scan only returns records that match the predexp # EXPECTED OUTPUT: # [ # (('test', 'demo', 1, bytearray(b'\xb7\xf4\xb88\x89\xe2\xdag\xdeh>\x1d\xf6\x91\x9a\x1e\xac\xc4F\xc8')), {'gen': 2, 'ttl': 2591999}, {'number': 1}), # (('test', 'demo', 3, bytearray(b'\xb1\xa5`g\xf6\xd4\xa8\xa4D9\xd3\xafb\xbf\xf8ha\x01\x94\xcd')), {'gen': 13, 'ttl': 2591999}, {'number': 3}) # ]
# contents of my_udf.lua function my_udf(rec, bin, offset) info("my transform: %s", tostring(record.digest(rec))) rec[bin] = rec[bin] + offset aerospike:update(rec) end
- foreach(callback[, policy[, options[, nodename]]])¶
Invoke the callback function for each of the records streaming back from the scan.
- Parameters
callback (callable) – the function to invoke for each record.
policy (dict) – optional Scan Policies.
options (dict) – the Scan Options that will apply to the scan.
nodename (str) – optional Node ID of node used to limit the scan to a single node.
Note
A Record Tuple is passed as the argument to the callback function.
import aerospike import pprint pp = pprint.PrettyPrinter(indent=2) config = { 'hosts': [ ('127.0.0.1',3000)]} client = aerospike.client(config).connect() client.put(('test','test','key1'), {'id':1,'a':1}, policy={'key':aerospike.POLICY_KEY_SEND}) client.put(('test','test','key2'), {'id':2,'b':2}, policy={'key':aerospike.POLICY_KEY_SEND}) def show_key(record): key, meta, bins = record print(key) scan = client.scan('test', 'test') scan_opts = { 'concurrent': True, 'nobins': True } scan.foreach(show_key, options=scan_opts) client.close()
Note
We expect to see:
('test', 'test', u'key2', bytearray(b'\xb2\x18\n\xd4\xce\xd8\xba:\x96s\xf5\x9ba\xf1j\xa7t\xeem\x01')) ('test', 'test', u'key1', bytearray(b'\x1cJ\xce\xa7\xd4Vj\xef+\xdf@W\xa5\xd8o\x8d:\xc9\xf4\xde'))
Note
To stop the stream return
False
from the callback function.import aerospike config = { 'hosts': [ ('127.0.0.1',3000)]} client = aerospike.client(config).connect() def limit(lim, result): c = [0] # integers are immutable so a list (mutable) is used for the counter def key_add(record): key, metadata, bins = record if c[0] < lim: result.append(key) c[0] = c[0] + 1 else: return False return key_add scan = client.scan('test','user') keys = [] scan.foreach(limit(100, keys)) print(len(keys)) # this will be 100 if the number of matching records > 100 client.close()
- execute_background([policy])¶
Execute a record UDF on records found by the scan in the background. This method returns before the scan has completed. A UDF can be added to the scan with
Scan.apply()
.- Parameters
policy (dict) – optional Write Policies.
- Returns
a job ID that can be used with
aerospike.job_info()
to track the status of theaerospike.JOB_SCAN
, as it runs in the background.
Note
Python client version 3.10.0 implemented scan execute_background.
import aerospike from aerospike import exception as ex import sys import time config = {"hosts": [("127.0.0.1", 3000)]} client = aerospike.client(config).connect() # register udf try: client.udf_put("/path/to/my_udf.lua") except ex.AerospikeError as e: print("Error: {0} [{1}]".format(e.msg, e.code)) client.close() sys.exit(1) # put records and apply udf try: keys = [("test", "demo", 1), ("test", "demo", 2), ("test", "demo", 3)] records = [{"number": 1}, {"number": 2}, {"number": 3}] for i in range(3): client.put(keys[i], records[i]) scan = client.scan("test", "demo") scan.apply("my_udf", "my_udf", ["number", 10]) job_id = scan.execute_background() # wait for job to finish while True: response = client.job_info(job_id, aerospike.JOB_SCAN) if response["status"] != aerospike.JOB_STATUS_INPROGRESS: break time.sleep(0.25) records = client.get_many(keys) print(records) except ex.AerospikeError as e: print("Error: {0} [{1}]".format(e.msg, e.code)) sys.exit(1) finally: client.close() # EXPECTED OUTPUT: # [ # (('test', 'demo', 1, bytearray(b'\xb7\xf4\xb88\x89\xe2\xdag\xdeh>\x1d\xf6\x91\x9a\x1e\xac\xc4F\xc8')), {'gen': 2, 'ttl': 2591999}, {'number': 11}), # (('test', 'demo', 2, bytearray(b'\xaejQ_7\xdeJ\xda\xccD\x96\xe2\xda\x1f\xea\x84\x8c:\x92p')), {'gen': 12, 'ttl': 2591999}, {'number': 12}), # (('test', 'demo', 3, bytearray(b'\xb1\xa5`g\xf6\xd4\xa8\xa4D9\xd3\xafb\xbf\xf8ha\x01\x94\xcd')), {'gen': 13, 'ttl': 2591999}, {'number': 13}) # ]
# contents of my_udf.lua function my_udf(rec, bin, offset) info("my transform: %s", tostring(record.digest(rec))) rec[bin] = rec[bin] + offset aerospike:update(rec) end
Scan Policies¶
- policy
A
dict
of optional scan policies which are applicable toScan.results()
andScan.foreach()
. See Policy Options.- max_retries
int
- Maximum number of retries before aborting the current transaction. The initial attempt is not counted as a retry.If max_retries is exceeded, the transaction will return error
AEROSPIKE_ERR_TIMEOUT
.Default:0
Warning
Database writes that are not idempotent (such as “add”) should not be retried because the write operation may be performed multiple times if the client timed out previous transaction attempts. It’s important to use a distinct write policy for non-idempotent writes which sets max_retries = 0;
- max_retries
- sleep_between_retries
int
- Milliseconds to sleep between retries. Enter
0
to skip sleep.Default:0
- sleep_between_retries
- socket_timeout
int
- Socket idle timeout in milliseconds when processing a database command.If socket_timeout is not
0
and the socket has been idle for at least socket_timeout, both max_retries and total_timeout are checked. If max_retries and total_timeout are not exceeded, the transaction is retried.If bothsocket_timeout
andtotal_timeout
are non-zero andsocket_timeout
>total_timeout
, thensocket_timeout
will be set tototal_timeout
. Ifsocket_timeout
is0
, there will be no socket idle limit.Default:30000
.
- socket_timeout
- total_timeout
int
- Total transaction timeout in milliseconds.The total_timeout is tracked on the client and sent to the server along with the transaction in the wire protocol. The client will most likely timeout first, but the server also has the capability to timeout the transaction.If
total_timeout
is not0
andtotal_timeout
is reached before the transaction completes, the transaction will return errorAEROSPIKE_ERR_TIMEOUT
. Iftotal_timeout
is0
, there will be no total time limit.Default:0
- total_timeout
- compress (
bool
) - Compress client requests and server responses.Use zlib compression on write or batch read commands when the command buffer size is greater than 128 bytes. In addition, tell the server to compress it’s response on read commands. The server response compression threshold is also 128 bytes.This option will increase cpu and memory usage (for extra compressed buffers), but decrease the size of data sent over the network.Default:
False
- compress (
- fail_on_cluster_change
bool
- Deprecated in 6.0.0. No longer has any effect..Abort the scan if the cluster is not in a stable state. Only used for server versions < 4.9.Default:
False
- fail_on_cluster_change
- durable_delete
bool
- Perform durable delete (requires Enterprise server version >= 3.10)If the transaction results in a record deletion, leave a tombstone for the record.Default:
False
- durable_delete
- records_per_second
int
- Limit the scan to process records at records_per_second.Requires server version >= 4.7.0.Default:
0
(no limit).
- records_per_second
- expressions
list
- Compiled aerospike expressions
aerospike_helpers
used for filtering records within a transaction.Default:None
Note
Requires Aerospike server version >= 5.2.
- expressions
- max_records
int
- Approximate number of records to return to client.This number is divided by the number of nodes involved in the scan.The actual number of records returned may be less than max_records if node record counts are small and unbalanced across nodes.Default:
0
(No Limit).
- max_records
Scan Options¶
- options
A
dict
of optional scan options which are applicable toScan.foreach()
.- priority
- Deprecated in 6.0.0. Scan priority will be removed in a coming release.Scan priority has been replaced by the records_per_second policy see Scan Policies.
- nobins
bool
- nobins
- concurrent
bool
- Whether to run the scan concurrently on all nodes of the cluster.Default
False
.
- concurrent
- percent
int
- Deprecated in version 6.0.0, will be removed in a coming release.No longer available with server 5.6+.Use scan policy max_records instead.Percentage of records to return from the scan.Default
100
.
- percent
New in version 1.0.39.