feat/match_collector: batch match import
This commit is contained in:
+12
-8
@@ -1,21 +1,25 @@
|
|||||||
services:
|
services:
|
||||||
# Development MongoDB with performance optimizations
|
# Development MongoDB with memory optimizations
|
||||||
mongodb:
|
mongodb:
|
||||||
image: mongo:latest
|
image: mongo:8.3.4
|
||||||
container_name: buildpath-mongodb
|
container_name: buildpath-mongodb
|
||||||
ports:
|
ports:
|
||||||
- "27017:27017"
|
- "27017:27017"
|
||||||
environment:
|
environment:
|
||||||
MONGO_INITDB_ROOT_USERNAME: ${MONGO_USER:-root}
|
MONGO_INITDB_ROOT_USERNAME: ${MONGO_USER:-root}
|
||||||
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_PASS:-password}
|
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_PASS:-password}
|
||||||
|
GLIBC_TUNABLES: glibc.pthread.rseq=1
|
||||||
volumes:
|
volumes:
|
||||||
- ./data/db:/data/db
|
- ./data/db:/data/db
|
||||||
command: mongod --wiredTigerCacheSizeGB 4 --quiet
|
# Reduced cache size to leave more RAM for the import script
|
||||||
healthcheck:
|
# WiredTiger cache is now 2GB (was 4GB) to prevent OOM during large imports
|
||||||
test: echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet
|
command: mongod --wiredTigerCacheSizeGB 2 --quiet
|
||||||
interval: 5s
|
deploy:
|
||||||
timeout: 2s
|
resources:
|
||||||
retries: 30
|
limits:
|
||||||
|
memory: 4G
|
||||||
|
reservations:
|
||||||
|
memory: 2G
|
||||||
|
|
||||||
mongo-express:
|
mongo-express:
|
||||||
image: mongo-express
|
image: mongo-express
|
||||||
|
|||||||
@@ -31,19 +31,29 @@ async function importLargeJsonFile(filePath, collectionName, batchSize = 1000) {
|
|||||||
const collection = db.collection(collectionName);
|
const collection = db.collection(collectionName);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Create indexes first for better performance
|
// Check file size first
|
||||||
await collection.createIndex({ "metadata.matchId": 1 }, { unique: true });
|
|
||||||
await collection.createIndex({ "info.gameDuration": 1 });
|
|
||||||
await collection.createIndex({ "info.participants.championId": 1 });
|
|
||||||
await collection.createIndex({ "info.participants.win": 1 });
|
|
||||||
|
|
||||||
// Check file size
|
|
||||||
const fileStats = fs.statSync(filePath);
|
const fileStats = fs.statSync(filePath);
|
||||||
const fileSize = (fileStats.size / (1024 * 1024 * 1024)).toFixed(2);
|
const fileSize = (fileStats.size / (1024 * 1024 * 1024)).toFixed(2);
|
||||||
console.log(` 📊 File size: ${fileSize} GB`);
|
console.log(` 📊 File size: ${fileSize} GB`);
|
||||||
|
|
||||||
|
// Defer index creation to after import to reduce memory pressure
|
||||||
|
// Only create the unique matchId index before import to prevent duplicates
|
||||||
|
console.log(` 📇 Creating unique matchId index...`);
|
||||||
|
await collection.createIndex({ "metadata.matchId": 1 }, { unique: true, background: false });
|
||||||
|
|
||||||
await processLineDelimitedFormat(filePath, collection, batchSize, startTime);
|
await processLineDelimitedFormat(filePath, collection, batchSize, startTime);
|
||||||
|
|
||||||
|
// Create additional indexes after import to reduce memory pressure
|
||||||
|
console.log(`\n 📇 Creating additional indexes (this may take a while)...`);
|
||||||
|
try {
|
||||||
|
await collection.createIndex({ "info.gameDuration": 1 }, { background: true });
|
||||||
|
await collection.createIndex({ "info.participants.championId": 1 }, { background: true });
|
||||||
|
await collection.createIndex({ "info.participants.win": 1 }, { background: true });
|
||||||
|
console.log(` ✅ Indexes created successfully`);
|
||||||
|
} catch (indexError) {
|
||||||
|
console.log(` ⚠️ Warning: Could not create additional indexes: ${indexError.message}`);
|
||||||
|
}
|
||||||
|
|
||||||
const totalTime = ((Date.now() - startTime) / 1000).toFixed(1);
|
const totalTime = ((Date.now() - startTime) / 1000).toFixed(1);
|
||||||
console.log(`🎉 Import complete in ${totalTime} seconds`);
|
console.log(`🎉 Import complete in ${totalTime} seconds`);
|
||||||
console.log(`✅ Processed: ${processed.toLocaleString()} matches`);
|
console.log(`✅ Processed: ${processed.toLocaleString()} matches`);
|
||||||
@@ -66,6 +76,7 @@ async function importLargeJsonFile(filePath, collectionName, batchSize = 1000) {
|
|||||||
|
|
||||||
let batch = [];
|
let batch = [];
|
||||||
let lineCount = 0;
|
let lineCount = 0;
|
||||||
|
let batchCount = 0;
|
||||||
|
|
||||||
for await (const line of rl) {
|
for await (const line of rl) {
|
||||||
lineCount++;
|
lineCount++;
|
||||||
@@ -88,9 +99,16 @@ async function importLargeJsonFile(filePath, collectionName, batchSize = 1000) {
|
|||||||
batch.push(match);
|
batch.push(match);
|
||||||
|
|
||||||
if (batch.length >= batchSize) {
|
if (batch.length >= batchSize) {
|
||||||
process.stdout.write(`\r Inserting batch into MongoDB... `);
|
batchCount++;
|
||||||
|
process.stdout.write(`\r Inserting batch #${batchCount} (${batch.length} matches)... `);
|
||||||
await insertBatch(batch, collection);
|
await insertBatch(batch, collection);
|
||||||
batch = [];
|
batch = [];
|
||||||
|
|
||||||
|
// Force garbage collection hint every 10 batches by yielding to the event loop
|
||||||
|
// This helps reduce memory pressure when processing large files
|
||||||
|
if (batchCount % 10 === 0) {
|
||||||
|
await new Promise(resolve => setImmediate(resolve));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
skipped++;
|
skipped++;
|
||||||
@@ -99,8 +117,11 @@ async function importLargeJsonFile(filePath, collectionName, batchSize = 1000) {
|
|||||||
|
|
||||||
// Insert remaining matches
|
// Insert remaining matches
|
||||||
if (batch.length > 0) {
|
if (batch.length > 0) {
|
||||||
|
process.stdout.write(`\r Inserting final batch (${batch.length} matches)... `);
|
||||||
await insertBatch(batch, collection);
|
await insertBatch(batch, collection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log(`\n 📊 Total batches inserted: ${batchCount + 1}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function insertBatch(batch, collection) {
|
async function insertBatch(batch, collection) {
|
||||||
|
|||||||
@@ -391,16 +391,35 @@ async function handleMatchList(
|
|||||||
const database = client.db('matches')
|
const database = client.db('matches')
|
||||||
const collectionName = platform ? `${patch}_${platform}` : patch
|
const collectionName = platform ? `${patch}_${platform}` : patch
|
||||||
const matches = database.collection(collectionName)
|
const matches = database.collection(collectionName)
|
||||||
const allMatches = matches.find()
|
|
||||||
const totalMatches: number = await matches.countDocuments()
|
const totalMatches: number = await matches.countDocuments()
|
||||||
|
|
||||||
|
// Process matches in batches to limit memory usage
|
||||||
|
const BATCH_SIZE = 1000
|
||||||
let currentMatch = 0
|
let currentMatch = 0
|
||||||
for await (const match of allMatches) {
|
let processedInBatch = 0
|
||||||
|
|
||||||
|
// Use cursor with batch size to limit memory consumption
|
||||||
|
const cursor = matches.find().batchSize(BATCH_SIZE)
|
||||||
|
|
||||||
|
try {
|
||||||
|
for await (const match of cursor) {
|
||||||
process.stdout.write(
|
process.stdout.write(
|
||||||
'\rComputing champion stats, game entry ' + currentMatch + '/' + totalMatches + ' ... '
|
'\rComputing champion stats, game entry ' + currentMatch + '/' + totalMatches + ' ... '
|
||||||
)
|
)
|
||||||
currentMatch += 1
|
currentMatch += 1
|
||||||
|
processedInBatch += 1
|
||||||
handleMatch(match as unknown as Match, champions, platform)
|
handleMatch(match as unknown as Match, champions, platform)
|
||||||
|
|
||||||
|
// Periodically yield to allow garbage collection and log progress
|
||||||
|
if (processedInBatch >= BATCH_SIZE) {
|
||||||
|
processedInBatch = 0
|
||||||
|
// Small delay to allow garbage collection
|
||||||
|
await new Promise(resolve => setImmediate(resolve))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
// Ensure cursor is closed
|
||||||
|
await cursor.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
return totalMatches
|
return totalMatches
|
||||||
@@ -736,6 +755,48 @@ async function championList() {
|
|||||||
return list.slice(1)
|
return list.slice(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compact matches collections to release memory back to the OS.
|
||||||
|
* This runs the MongoDB compact command which reclaims disk space
|
||||||
|
* and clears the WiredTiger cache for the specified collections.
|
||||||
|
*/
|
||||||
|
async function compactMatchesCollections(
|
||||||
|
client: MongoClient,
|
||||||
|
patch: string,
|
||||||
|
platforms: string[]
|
||||||
|
): Promise<void> {
|
||||||
|
const database = client.db('matches')
|
||||||
|
|
||||||
|
console.log('\n=== Compacting matches collections to release memory ===')
|
||||||
|
|
||||||
|
for (const platform of platforms) {
|
||||||
|
const collectionName = `${patch}_${platform}`
|
||||||
|
console.log(`Compacting collection: ${collectionName}...`)
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Run compact command to release memory and defragment
|
||||||
|
// This forces MongoDB to release WiredTiger cache for this collection
|
||||||
|
// Note: compact must be run on the database that contains the collection
|
||||||
|
const result = await database.command({
|
||||||
|
compact: collectionName,
|
||||||
|
force: true
|
||||||
|
} as { compact: string; force: boolean })
|
||||||
|
console.log(`Compaction result for ${collectionName}:`, result)
|
||||||
|
} catch (error) {
|
||||||
|
// Compact command may fail if collection doesn't exist or lacks privileges
|
||||||
|
// This is not critical, so log and continue
|
||||||
|
const errorMsg = error instanceof Error ? error.message : String(error)
|
||||||
|
if (errorMsg.includes('NamespaceNotFound')) {
|
||||||
|
console.log(`Note: Collection ${collectionName} not found, skipping compaction`)
|
||||||
|
} else {
|
||||||
|
console.log(`Note: Could not compact ${collectionName}:`, errorMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log('Compaction complete.')
|
||||||
|
}
|
||||||
|
|
||||||
async function makeChampionsStats(client: MongoClient, patch: string, platforms: string[] = []) {
|
async function makeChampionsStats(client: MongoClient, patch: string, platforms: string[] = []) {
|
||||||
const globalItems = await itemList()
|
const globalItems = await itemList()
|
||||||
for (const item of globalItems) {
|
for (const item of globalItems) {
|
||||||
@@ -766,6 +827,12 @@ async function makeChampionsStats(client: MongoClient, patch: string, platforms:
|
|||||||
const platformMatches = await handleMatchList(client, patch, champions, platform)
|
const platformMatches = await handleMatchList(client, patch, champions, platform)
|
||||||
totalMatches += platformMatches
|
totalMatches += platformMatches
|
||||||
console.log(`Processed ${platformMatches} matches from ${platform}`)
|
console.log(`Processed ${platformMatches} matches from ${platform}`)
|
||||||
|
|
||||||
|
// Clear the item dict entries for this platform to free memory
|
||||||
|
// (they will be re-populated if needed for next platform)
|
||||||
|
if (itemDict.size > 0) {
|
||||||
|
console.log(`Clearing item cache to free memory...`)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(`\n=== Total matches processed: ${totalMatches} ===`)
|
console.log(`\n=== Total matches processed: ${totalMatches} ===`)
|
||||||
@@ -781,6 +848,9 @@ async function makeChampionsStats(client: MongoClient, patch: string, platforms:
|
|||||||
// Create alias-index for better key-find
|
// Create alias-index for better key-find
|
||||||
await collection.createIndex({ alias: 1 })
|
await collection.createIndex({ alias: 1 })
|
||||||
console.log(`Stats saved to collection: ${patch}`)
|
console.log(`Stats saved to collection: ${patch}`)
|
||||||
|
|
||||||
|
// Compact matches collections to release memory back to the OS
|
||||||
|
await compactMatchesCollections(client, patch, platforms)
|
||||||
}
|
}
|
||||||
|
|
||||||
export default { makeChampionsStats }
|
export default { makeChampionsStats }
|
||||||
|
|||||||
Reference in New Issue
Block a user