DS201: DataStax Enterprise 6 Foundations of Apache Cassandra™
# How to start
bin/cassandra     # for core version
./dse cassandra   # for DSE version

bin/nodetool status    # provide with cluster health info

CQL Fundamentals #

-- CQL - very similar to SQL
SELECT * FROM USERS;

-- Keyspaces - very similar to schemas in RDBMS; it is top level namespace/container; replication parameters required; contains tables
CREATE KEYSPACE some_keyspace
WITH REPLICATION = {
    'class': 'SimpleStrategy',
    'replication_factor': 1
};

-- USE switches between keyspaces
USE other_keyspace;

-- Tables contain data
CREATE TABLE table1 (
    column1 TEXT,
    column2 TEXT,
    column3 INT,
    PRIMARY KEY (column1)
);

-- INSERT syntax similar to SQL syntax
INSERT INTO users (user_id, first_name, last_name)
VALUES (uuid(), 'John', 'Doe')

-- SELECT also similar to SQL syntax
SELECT *
FROM users;

SELECT first_name, last_name
FROM users;

SELECT *
FROM users
WHERE user_id = some_uuid_value;

-- COPY - uses to import/export data between tables and CSV files
COPY table1 (column1, column2, column3) FROM 'table1data.csv';
-- Header parameter skips the first line in the file
COPY table1 (column1, column2, column3) FROM 'table1data.csv'
WITH HEADER=true;

-- TRUNCATE - removes table
TRUNCATE table;

-- Get info about table
DESCRIBE table;

Core datatypes #

  • text:
    • UTF8 encoded strings
    • varchar is same as text
  • int:
    • signed
    • 32 bits
  • timestamps:
    • date and time
    • 64 bit integer
    • stores number of seconds in UNIX epoch
  • UUID
    • generate via uuid()
  • TIMEUUID
    • contains timestamp value
    • sortable
    • generate via now()

UUID and TIMEUUID - used place of integer IDs because Cassandra is a distributed DB

Partitions #

Tables sorted by partition key and splited for parts by its value over the cluster nodes. Partition key - first value of primary key

Clustering columns #

This is other half of primary key; they used for sorting data in partitions

Primary key - is the most important part of your data model in Cassandra

You can’t change primary key if you already have data in table, because in this case you need to redo your data model

There might be multiple cluster columns in table

To prevent collisions, also need to add to the primary key some uuid field.

Every query should have a pertition key

You can perform either equality (=) or range queries (<, >) in clustering columns

All equality comparisons must come before inequality comparisons

Since data is sorted on disk, range searches are a binary search followed by a linear read

Changing default ordering #

By default clustering columns ordered by ascending

Change ordering direction via WITH CLUSTERING ORDER BY

Must include all columns including an up to the columns you wish to order descending

CREATE TABLE users (
    state text,
    city text,
    name textm
    id uuid,
    PRIMARY KEY((state), sity, name, id))
    WITH CLUSTERING ORDER BY (city DESC, name ASC);
)

Node #

This is strongly recommend to store Casandra data on the local storage, not on SAN (or sort of)

Node stores all data in distributed hash tables

Approximate performance of one node - 6000 - 12000 transactions/second/core. Also, one node can effectively store - 2-4 TB of data

Nodetool #

Tool for node management. Located in ${install_root}/bin/

Sub-commands:

  • help
  • info
  • status
  • describecluster
  • getlogginglevels
  • setlogginglevel
  • settraceprobability 0.1
  • drain
  • stopdaemon
  • flush

Ring #

Ring - name of Cassandra cluster.

Each node handle specific range of all stored in a cluster data. Coordinator - node, which receives data from a client and after that it transmit data to the node, which handle right range,

Joining the Cluster #

Nodes join the cluster by communicating with any node. Cassandra finds these seed nodes list of possible nodes in cassandra.yaml Seed nodes communicate cluster topology to the joining node. Node the new node joins the cluster, all nodes are peers.

Node states:

  • joining
  • leaving
  • up
  • down

Drivers #

Drivers may choose which node would base coordinate a request

Diver policies:

  • TokenAwarePolicy - driver chooses node which contains the data
  • RoundRobin - driver round robin the ring
  • DCAwareRoundRobinPolicy - driver round robins the target data center

Vnodes #

With Vnodes, each node is now responsible for multiple smaller slices of the ring, instead of the one large slice.

When new node joins to the ring, each current node streams few slices to it in parallel. And now the new node will responsible for this slices.

Adding/removing nodes with vnodes helps keep the cluster balanced. By default, each node has 128 vnodes. VNodes automate token ring assignment.

Number of vnodes may be configured in cassandra.yaml with num_tokens parameter. Each value greater than one turns on vnodes.

Gossip #

Gossit - broadcast protocol

  • Each node initiates a gossip round every second
  • pick on to three nodes to gossip with
  • nodes can gossip with ANY other node in the cluster
  • fault tolerant - continues to spread when nodes fail

Gossip spreads only node metadata, not the client data.

Endpoint State:

  • Heartbeat State:

    • generation (time since node bootstrapped)
    • version (increments every second after gossip with ather node)
  • Application State (stores node metadata):

    • STATUS:
      • BOOTSTRAP
      • NORMAL
      • LEAVING
      • LEFT
      • REMOVING
      • REMOVED
    • DC (datacenter)
    • RACK
    • SCHEMA (number of schema changes)
    • LOAD (disk space usage)
    • etc
  • SYN digest schema - endpoint_ip:generation:version (i.e. 127.0.0.1:100:20)

  • ACK also stores digest of outdated info

  • ACK2 - send only updated info.

nodetool gossipinfo
SELECT peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version
FROM system.peers;

Snitch #

  • Determines/declares each node’s rack and data center
  • The “topology” of the cluster

There are several types of snitches:

  • Regular:
    • SimpleSnitch
      • default snitch
      • places all nodes in the same datacenter and rack (datacenter1 and rack1)
    • PropertyFileSnitch
      • reads dc and rack info for all nodes from a file
      • you must keep files in sync with all nodes in the cluster
      • cassandra-topology.properties file
    • GossipingPropertyFileSnutch (the most popular type)
      • declare the current node’s DC/Rack info in a file
      • you must set each individual node’s settings
      • but you don’t have to copy settings as with property file snitch
      • Gossip spreads the settings through the cluster
      • cassandra-rackdc.properties file
    • RackInferringSnitch
      • infers the rack and DB from the IP address:
        • 2nd octet is DB octet
        • 3rd - rack octet
        • 4th - node octet
    • DynamicSnitch
      • layered on top of your actual snitch
      • maitains a pulse of each node’s performance
      • determones which node to query replicas from depending on node health
      • turned on by default for all snitches
  • Cloud Based:
    • Ec2Snitch
    • Ec2MultiRegionSnitch
    • GoogleCloudSnitch
    • CloudstackSnitch

Configured on cassandra.yaml:

endpoint_snitch: SimpleSnitch

Configuring snitches #

  • All nodes in the cluster must ust the same snotch
  • Changing cluster network topology requires rastarting all nodes
  • Run sequential repair and clean up on each node

Replication #

With replication factor (RF) 2, each node stores not only its own data, but also its neighbour data. And in this case, coordinator writes data on two nodes.

With RF=3, each node stores also data of the neighbour of the neighbour.

Consistency #

CAP theorem; Cassandra choose Partition Tolerance and Availability.

Consistency levels - number of acknowledges from the target node to the coordinator during the data writing.

  • any (storing a hit at minimum is satisfactory)
  • one, two, three
  • quorum
  • local_one (the closest node to coordinator in the same dc)
  • local_quorum (only for one dc)
  • each_quorum (quorum of nodes in each dc, applies to write only)
  • all
-- cql operator/cqlsh command
CONSISTENCY
# determine which nodes hold the replicas of the partition tag
nodetool getendpoints keyspace_name table_name 'partition_key_name'

Hinted handoff #

If replica node is down, coordinator stores the data until replica will back online.

Settings:

  • cassandra.yaml
  • you can disable hinted handoff
  • choose a directory to store hints file
  • set the amount of time a node will store a hint
  • default is three hours

Read repair #

With Cassandra, you can choose between absolutely synced with each others node and highly available nodes.

Read with CL=all:

  • coordinator reads data from the closest node and request digest (data’s checksum) from others replicas
  • coordinator compares data and checksums and if they equal, sends data to the client.
  • if checksums don’t compare:
    • coordinator finds out a timestamp from received data
    • requests full data from all replicas to compare its timestamps
    • after finding the latest version of data, coordinator sends it to the outdated replicas and to the client.

Read repair chance:

  • performed when read is at a consistency level less then ALL
  • request reads only a subset of the replicas
  • we can’t be sure replicas are in sync
  • generally you are safe, but no guarantees
  • response sent immediately when consistence level is met
  • read repair done asynchronously in the background
  • dclocal_read_repair_chance set to 0.1 (10%) by default
    • read repair that is confined to the same DC as the coordinator node
  • read_repair_chance set to 0 by default
    • for a read repair across all DCs with replicas

Nodetool repair #

  • syncs all data in the cluster
  • expensive
    • grows with an amount of data in cluster
  • use with a cluster servicing high writes/deletes
  • last line of defense
  • run to synchronize a failed node coming back online
  • run on nodes not read from very often

Node sync #

Full repairs:

  • full repairs bog down the system
  • bigger the cluster and dataset. the worse the time
  • in times past, we recommended running full repair within gc_grace_seconds

Node sync:

  • Runs on the background continuously repairing you data
    • quiet hub vs everybody stops what you’re doing
  • better to repair in small chunks as we go rather than full repair
  • automatic enabled by default (in DataStax version)
    • by you must enable it per table
  • each node runs NodeSync
  • NodeSync continuously validates and repairs data
CREATE TABLE myTable (...)
WITH nodesync = {'enabled': 'true'};

Save Points:

  • each node splits its local range into segments
    • small token range of a table
  • each segment makes a save point
    • NodeSync repairs a segment
    • then NodeSync saves its progress
    • repeat
  • NodeSync priorities segments to meet the deadline target

Segments sizes:

  • determining token range in a given segment is a simple recursive split
  • target is each segment is less than 200MB
    • configurable, but good default, segment_size_target_bytes
    • greater than a partition
    • so partitions greater than 200MB win over segments less than 200MB
  • algorithm doesn’t calculate data size but instead assumes acceptable distribution of data among your cluster

Segment failures

  • nodes validate/repair segments as a whole
  • if node fails during segment validation, node drops all work for that segment and starts over
  • node records successful segment validation in the system_distributed.nodesync_status table

Segment outcomes

  • full_in_sync: all replicas ware in sync
  • full_repaired: some repair necessary
  • partial_in_sync: not all replicas responded (at lest 2 did), but all respondent ware in sync
  • partial_repaired: not all replicas responded (at least 2 did), with some repair needed
  • uncompleted: one node available/responded; no validation occured
  • failed: inexpected error happend; check logs

Segment validation:

  • NodeSync simply performs a read repair on the segment
  • read data from all replicas
  • check for inconsistencies
  • repair stale nodes

Write path #

Writes:

  • MemTable (in RAM)
    • always ordered by partition key and clustering column
  • Commit Log (on disk)
    • stored sequentially, every record just append to the commit log
  • After the data stored into MemTable and Commit log, Cassandra sends acknowledge to the client
  • When MemTAble is full, it flushes to the disk and this structure called SSTable (and now this structure is immutable)
  • After that, Cassandra deletes Commit log, because it already has sorted data on the disk

It’s recommended to store Commit Log on different storage as SSTables

# run stress test
cassandra-stress write no-warmup n=250000 -port native=9041 -rate threads=1
# show data of the stress test
nodetool cfstats keyspace1.standard1

Read path #

Reads:

  • from MemTable:
    • just find value in MemTable with binary search
  • from SSTAble:
    • SSTAble file has an index file, which stores partition token and its offset in bytes
    • Partition summary - im memory index of partition indexes
    • result of read is stored in key cache for case if client want to read similar data next time

Bloom filter #

need to google it !!1!

DSE Read Path Optimizations #

  • no partition summary
  • partition index changed to a trie-based data structure
  • SSTable lookups in this format scream!
  • huge performance improvements; especially for the large SSTables

Compactions #

Compaction is a process of merging few SSTables into one SSTable.

During compaction Cassandra selected more recent data (with greater timestamps) When you delete data? Cassandra actually writes a tombstone instead deleted record, so during compaction if this record timestamp is greater than 10 days (by default, configurable in cassandra.yaml), it will skipped, in other case it will write to result file.

Compaction strategies #

  • Compaction strategies are configurable. These strategies include:
    • Size Tiered Compaction (default) - triggers when multiple SSTables of a similar size are present
    • Leveled Compaction - groups SSTables into levels, each of which has a fixed size limit which is 10 times larger than the previous level
    • TimeWindow Compaction - creates time windowed buckets of SSTables that are compacted with each other using the Size Tiered Compaction Strategy
  • Use the ALTER TABLE command to change the strategy
ALTER TABLE myKeySpace.myTable
WITH compaction = {'class': 'LeveledCompactionStrategy'};