DynamoDB - Attività tabella

I flussi DynamoDB ti consentono di monitorare e rispondere alle modifiche degli elementi della tabella. Utilizzare questa funzionalità per creare un'applicazione che risponda ai cambiamenti aggiornando le informazioni attraverso le fonti. Sincronizza i dati per migliaia di utenti di un grande sistema multiutente. Usalo per inviare notifiche agli utenti sugli aggiornamenti. Le sue applicazioni si dimostrano diverse e sostanziali. I flussi DynamoDB fungono da strumento principale utilizzato per ottenere questa funzionalità.

I flussi acquisiscono sequenze ordinate nel tempo contenenti modifiche di elementi all'interno di una tabella. Conservano questi dati per un massimo di 24 ore. Le applicazioni li utilizzano per visualizzare gli elementi originali e modificati, quasi in tempo reale.

I flussi abilitati su una tabella acquisiscono tutte le modifiche. Su qualsiasi operazione CRUD, DynamoDB crea un record di flusso con gli attributi della chiave primaria degli elementi modificati. È possibile configurare i flussi per ulteriori informazioni come le immagini prima e dopo.

Gli Stream hanno due garanzie:

  • Ogni record appare una volta nello stream e

  • Ogni modifica di un elemento risulta nei record del flusso dello stesso ordine di quello delle modifiche.

Tutti i flussi vengono elaborati in tempo reale per consentirti di utilizzarli per le funzionalità correlate nelle applicazioni.

Gestione dei flussi

Durante la creazione della tabella, puoi abilitare uno stream. Le tabelle esistenti consentono la disabilitazione del flusso o la modifica delle impostazioni. I flussi offrono la funzionalità dell'operazione asincrona, il che significa nessun impatto sulle prestazioni della tabella.

Utilizza la console di gestione AWS per una semplice gestione del flusso. Per prima cosa, vai alla console e scegliTables. Nella scheda Panoramica, scegliManage Stream. All'interno della finestra, seleziona le informazioni aggiunte a un flusso sulle modifiche ai dati della tabella. Dopo aver inserito tutte le impostazioni, selezionareEnable.

Se desideri disabilitare gli stream esistenti, seleziona Manage Stream, e poi Disable.

È inoltre possibile utilizzare le API CreateTable e UpdateTable per abilitare o modificare un flusso. Utilizzare il parametro StreamSpecification per configurare il flusso. StreamEnabled specifica lo stato, ovvero true per abilitato e falso per disabilitato.

StreamViewType specifica le informazioni aggiunte allo stream: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE e NEW_AND_OLD_IMAGES.

Stream Reading

Leggere ed elaborare i flussi connettendosi a un endpoint ed effettuando richieste API. Ogni flusso è costituito da record di flusso e ogni record esiste come una singola modifica che possiede il flusso. I record di flusso includono un numero di sequenza che rivela l'ordine di pubblicazione. I record appartengono a gruppi noti anche come frammenti. I frammenti fungono da contenitori per diversi record e contengono anche le informazioni necessarie per accedere e attraversare i record. Dopo 24 ore, i record vengono eliminati automaticamente.

Questi frammenti vengono generati ed eliminati secondo necessità e non durano a lungo. Inoltre si dividono automaticamente in più nuovi frammenti, in genere in risposta ai picchi di attività di scrittura. Durante la disattivazione del flusso, i frammenti aperti si chiudono. La relazione gerarchica tra i frammenti indica che le applicazioni devono dare la priorità ai frammenti principali per un corretto ordine di elaborazione. Puoi utilizzare Kinesis Adapter per farlo automaticamente.

Note - Le operazioni che non comportano modifiche non scrivono record di flusso.

L'accesso e l'elaborazione dei record richiede l'esecuzione delle seguenti attività:

  • Determina l'ARN del flusso di destinazione.
  • Determina i frammenti del flusso che contengono i record di destinazione.
  • Accedi agli shard per recuperare i record desiderati.

Note- Ci dovrebbero essere un massimo di 2 processi che leggono un frammento contemporaneamente. Se supera 2 processi, può limitare la sorgente.

Le azioni dell'API di flusso disponibili includono

  • ListStreams
  • DescribeStream
  • GetShardIterator
  • GetRecords

Puoi rivedere il seguente esempio di lettura del flusso:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =  
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());  
   private static AmazonDynamoDBStreamsClient streamsClient =  
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());  

   public static void main(String args[]) {  
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");   
      streamsClient.setEndpoint("InsertStreamEndpointHere");    
      
      // table creation 
      String tableName = "MyTestingTable";  
      ArrayList<AttributeDefinition> attributeDefinitions =  
         new ArrayList<AttributeDefinition>();  
      
      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID") 
         .withAttributeType("N"));
         
      ArrayList<KeySchemaElement> keySchema = new 
         ArrayList<KeySchemaElement>(); 
      
      keySchema.add(new KeySchemaElement() 
         .withAttributeName("ID") 
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification(); 
      streamSpecification.setStreamEnabled(true); 
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);  
      CreateTableRequest createTableRequest = new CreateTableRequest() 
         .withTableName(tableName) 
         .withKeySchema(keySchema) 
         .withAttributeDefinitions(attributeDefinitions) 
         .withProvisionedThroughput(new ProvisionedThroughput() 
         .withReadCapacityUnits(1L) 
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);  
      
      System.out.println("Executing CreateTable for " + tableName); 
      dynamoDBClient.createTable(createTableRequest);  
      System.out.println("Creating " + tableName); 
      
      try { 
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); 
      } catch (InterruptedException e) { 
         e.printStackTrace(); 
      } 
         
      // Get the table's stream settings 
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);  
      
      String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); 
      StreamSpecification myStreamSpec =  
         describeTableResult.getTable().getStreamSpecification();  
      
      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); 
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());  
      
      // Add an item 
      int numChanges = 0; 
      System.out.println("Making some changes to table data"); 
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); 
      item.put("ID", new AttributeValue().withN("222")); 
      item.put("Alert", new AttributeValue().withS("item!")); 
      dynamoDBClient.putItem(tableName, item); 
      numChanges++;  
      
      // Update the item         
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); 
      key.put("ID", new AttributeValue().withN("222")); 
      Map<String, AttributeValueUpdate> attributeUpdates =  
      new HashMap<String, AttributeValueUpdate>(); 
      
      attributeUpdates.put("Alert", new AttributeValueUpdate() 
         .withAction(AttributeAction.PUT) 
         .withValue(new AttributeValue().withS("modified item"))); 
      
      dynamoDBClient.updateItem(tableName, key, attributeUpdates); 
      numChanges++;   
      
      // Delete the item         
      dynamoDBClient.deleteItem(tableName, key);  
      numChanges++;
      
      // Get stream shards         
      DescribeStreamResult describeStreamResult =  
      streamsClient.describeStream(new DescribeStreamRequest() 
         .withStreamArn(myStreamArn)); 
      String streamArn =  
         describeStreamResult.getStreamDescription().getStreamArn(); 
      List<Shard> shards =  
         describeStreamResult.getStreamDescription().getShards();  
      
      // Process shards 
      for (Shard shard : shards) { 
         String shardId = shard.getShardId(); 
         System.out.println("Processing " + shardId + " in "+ streamArn);  
         
         // Get shard iterator 
         GetShardIteratorRequest getShardIteratorRequest = new 
            GetShardIteratorRequest() 
            .withStreamArn(myStreamArn) 
            .withShardId(shardId) 
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); 
         
         GetShardIteratorResult getShardIteratorResult =  
            streamsClient.getShardIterator(getShardIteratorRequest); 
         String nextItr = getShardIteratorResult.getShardIterator();  
         
         while (nextItr != null && numChanges > 0) { 
            // Read data records with iterator                 
            GetRecordsResult getRecordsResult =  
               streamsClient.getRecords(new GetRecordsRequest(). 
               withShardIterator(nextItr));
               
            List<Record> records = getRecordsResult.getRecords(); 
            System.out.println("Pulling records...");  
               
            for (Record record : records) { 
               System.out.println(record); 
               numChanges--;
            } 
            nextItr = getRecordsResult.getNextShardIterator(); 
         } 
      } 
   } 
}