cleanup migrations, json, jsonl

This commit is contained in:
Nick Sweeting
2025-12-31 15:36:13 -08:00
parent 0930911a15
commit a04e4a7345
21 changed files with 993 additions and 1418 deletions

View File

@@ -100,46 +100,8 @@ class Migration(migrations.Migration):
CREATE INDEX IF NOT EXISTS machine_binary_status_idx ON machine_binary(status);
CREATE INDEX IF NOT EXISTS machine_binary_retry_at_idx ON machine_binary(retry_at);
-- Create machine_process table
CREATE TABLE IF NOT EXISTS machine_process (
id TEXT PRIMARY KEY NOT NULL,
created_at DATETIME NOT NULL,
modified_at DATETIME NOT NULL,
machine_id TEXT NOT NULL,
binary_id TEXT,
iface_id TEXT,
pwd VARCHAR(512) NOT NULL DEFAULT '',
cmd TEXT NOT NULL DEFAULT '[]',
env TEXT NOT NULL DEFAULT '{}',
timeout INTEGER NOT NULL DEFAULT 120,
pid INTEGER,
exit_code INTEGER,
stdout TEXT NOT NULL DEFAULT '',
stderr TEXT NOT NULL DEFAULT '',
started_at DATETIME,
ended_at DATETIME,
url VARCHAR(2048),
status VARCHAR(16) NOT NULL DEFAULT 'queued',
retry_at DATETIME,
FOREIGN KEY (machine_id) REFERENCES machine_machine(id) ON DELETE CASCADE,
FOREIGN KEY (binary_id) REFERENCES machine_binary(id) ON DELETE SET NULL,
FOREIGN KEY (iface_id) REFERENCES machine_networkinterface(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS machine_process_status_idx ON machine_process(status);
CREATE INDEX IF NOT EXISTS machine_process_retry_at_idx ON machine_process(retry_at);
CREATE INDEX IF NOT EXISTS machine_process_machine_id_idx ON machine_process(machine_id);
CREATE INDEX IF NOT EXISTS machine_process_binary_id_idx ON machine_process(binary_id);
CREATE INDEX IF NOT EXISTS machine_process_machine_status_retry_idx ON machine_process(machine_id, status, retry_at);
""",
reverse_sql="""
DROP TABLE IF EXISTS machine_process;
DROP TABLE IF EXISTS machine_binary;
DROP TABLE IF EXISTS machine_networkinterface;
DROP TABLE IF EXISTS machine_machine;
@@ -167,6 +129,8 @@ class Migration(migrations.Migration):
('os_kernel', models.CharField(default=None, max_length=255)),
('stats', models.JSONField(blank=True, default=dict, null=True)),
('config', models.JSONField(blank=True, default=dict, help_text='Machine-specific config overrides (e.g., resolved binary paths like WGET_BINARY)', null=True)),
('num_uses_succeeded', models.PositiveIntegerField(default=0)),
('num_uses_failed', models.PositiveIntegerField(default=0)),
],
options={
'app_label': 'machine',
@@ -189,6 +153,8 @@ class Migration(migrations.Migration):
('region', models.CharField(default=None, max_length=63)),
('country', models.CharField(default=None, max_length=63)),
('machine', models.ForeignKey(default=None, on_delete=django.db.models.deletion.CASCADE, to='machine.machine')),
('num_uses_succeeded', models.PositiveIntegerField(default=0)),
('num_uses_failed', models.PositiveIntegerField(default=0)),
],
options={
'unique_together': {('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server')},
@@ -212,6 +178,8 @@ class Migration(migrations.Migration):
('retry_at', models.DateTimeField(blank=True, db_index=True, default=django.utils.timezone.now, help_text='When to retry this binary installation', null=True)),
('output_dir', models.CharField(blank=True, default='', help_text='Directory where installation hook logs are stored', max_length=255)),
('machine', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='machine.machine')),
('num_uses_succeeded', models.PositiveIntegerField(default=0)),
('num_uses_failed', models.PositiveIntegerField(default=0)),
],
options={
'verbose_name': 'Binary',
@@ -220,43 +188,6 @@ class Migration(migrations.Migration):
'app_label': 'machine',
},
),
migrations.CreateModel(
name='Process',
fields=[
('id', models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True)),
('created_at', models.DateTimeField(db_index=True, default=django.utils.timezone.now)),
('modified_at', models.DateTimeField(auto_now=True)),
('pwd', models.CharField(blank=True, default='', help_text='Working directory for process execution', max_length=512)),
('cmd', models.JSONField(blank=True, default=list, help_text='Command as array of arguments')),
('env', models.JSONField(blank=True, default=dict, help_text='Environment variables for process')),
('timeout', models.IntegerField(default=120, help_text='Timeout in seconds')),
('pid', models.IntegerField(blank=True, default=None, help_text='OS process ID', null=True)),
('exit_code', models.IntegerField(blank=True, default=None, help_text='Process exit code (0 = success)', null=True)),
('stdout', models.TextField(blank=True, default='', help_text='Standard output from process')),
('stderr', models.TextField(blank=True, default='', help_text='Standard error from process')),
('started_at', models.DateTimeField(blank=True, default=None, help_text='When process was launched', null=True)),
('ended_at', models.DateTimeField(blank=True, default=None, help_text='When process completed/terminated', null=True)),
('url', models.URLField(blank=True, default=None, help_text='Connection URL (CDP endpoint, sonic server, etc.)', max_length=2048, null=True)),
('status', models.CharField(choices=[('queued', 'Queued'), ('running', 'Running'), ('exited', 'Exited')], db_index=True, default='queued', max_length=16)),
('retry_at', models.DateTimeField(blank=True, db_index=True, default=django.utils.timezone.now, help_text='When to retry this process', null=True)),
('machine', models.ForeignKey(help_text='Machine where this process executed', on_delete=django.db.models.deletion.CASCADE, related_name='process_set', to='machine.machine')),
('binary', models.ForeignKey(blank=True, help_text='Binary used by this process', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='process_set', to='machine.binary')),
('iface', models.ForeignKey(blank=True, help_text='Network interface used by this process', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='process_set', to='machine.networkinterface')),
],
options={
'verbose_name': 'Process',
'verbose_name_plural': 'Processes',
'app_label': 'machine',
},
),
migrations.AddIndex(
model_name='process',
index=models.Index(fields=['machine', 'status', 'retry_at'], name='machine_pro_machine_5e3a87_idx'),
),
migrations.AddIndex(
model_name='process',
index=models.Index(fields=['binary', 'exit_code'], name='machine_pro_binary__7bd19c_idx'),
),
],
),
]

View File

@@ -0,0 +1,45 @@
# Generated by Django 6.0 on 2025-12-31 22:54
import django.db.models.deletion
import django.utils.timezone
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('machine', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='Process',
fields=[
('id', models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True)),
('created_at', models.DateTimeField(db_index=True, default=django.utils.timezone.now)),
('modified_at', models.DateTimeField(auto_now=True)),
('pwd', models.CharField(blank=True, default='', help_text='Working directory for process execution', max_length=512)),
('cmd', models.JSONField(blank=True, default=list, help_text='Command as array of arguments')),
('env', models.JSONField(blank=True, default=dict, help_text='Environment variables for process')),
('timeout', models.IntegerField(default=120, help_text='Timeout in seconds')),
('pid', models.IntegerField(blank=True, default=None, help_text='OS process ID', null=True)),
('exit_code', models.IntegerField(blank=True, default=None, help_text='Process exit code (0 = success)', null=True)),
('stdout', models.TextField(blank=True, default='', help_text='Standard output from process')),
('stderr', models.TextField(blank=True, default='', help_text='Standard error from process')),
('started_at', models.DateTimeField(blank=True, default=None, help_text='When process was launched', null=True)),
('ended_at', models.DateTimeField(blank=True, default=None, help_text='When process completed/terminated', null=True)),
('url', models.URLField(blank=True, default=None, help_text='Connection URL (CDP endpoint, sonic server, etc.)', max_length=2048, null=True)),
('status', models.CharField(choices=[('queued', 'Queued'), ('running', 'Running'), ('exited', 'Exited')], db_index=True, default='queued', max_length=16)),
('retry_at', models.DateTimeField(blank=True, db_index=True, default=django.utils.timezone.now, help_text='When to retry this process', null=True)),
('binary', models.ForeignKey(blank=True, help_text='Binary used by this process', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='process_set', to='machine.binary')),
('iface', models.ForeignKey(blank=True, help_text='Network interface used by this process', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='process_set', to='machine.networkinterface')),
('machine', models.ForeignKey(help_text='Machine where this process executed', on_delete=django.db.models.deletion.CASCADE, related_name='process_set', to='machine.machine')),
],
options={
'verbose_name': 'Process',
'verbose_name_plural': 'Processes',
'indexes': [models.Index(fields=['machine', 'status', 'retry_at'], name='machine_pro_machine_5e3a87_idx'), models.Index(fields=['binary', 'exit_code'], name='machine_pro_binary__7bd19c_idx')],
},
),
]

View File

@@ -1,101 +0,0 @@
# Generated on 2025-12-31
# Adds parent FK and process_type field to Process model
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('machine', '0001_initial'),
]
operations = [
migrations.SeparateDatabaseAndState(
database_operations=[
migrations.RunSQL(
sql="""
-- Add parent_id FK column to machine_process
ALTER TABLE machine_process ADD COLUMN parent_id TEXT REFERENCES machine_process(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS machine_process_parent_id_idx ON machine_process(parent_id);
-- Add process_type column with default 'binary'
ALTER TABLE machine_process ADD COLUMN process_type VARCHAR(16) NOT NULL DEFAULT 'binary';
CREATE INDEX IF NOT EXISTS machine_process_process_type_idx ON machine_process(process_type);
-- Add composite index for parent + status queries
CREATE INDEX IF NOT EXISTS machine_process_parent_status_idx ON machine_process(parent_id, status);
-- Add composite index for machine + pid + started_at (for PID reuse protection)
CREATE INDEX IF NOT EXISTS machine_process_machine_pid_started_idx ON machine_process(machine_id, pid, started_at);
""",
# Migration is irreversible due to SQLite limitations
# SQLite doesn't support DROP COLUMN, would require table rebuild
reverse_sql=migrations.RunSQL.noop
),
],
state_operations=[
# Add parent FK
migrations.AddField(
model_name='process',
name='parent',
field=models.ForeignKey(
blank=True,
help_text='Parent process that spawned this one',
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='children',
to='machine.process',
),
),
# Add process_type field
migrations.AddField(
model_name='process',
name='process_type',
field=models.CharField(
choices=[
('cli', 'CLI Command'),
('supervisord', 'Supervisord Daemon'),
('orchestrator', 'Orchestrator'),
('worker', 'Worker Process'),
('hook', 'Hook Script'),
('binary', 'Binary Execution'),
],
default='binary',
help_text='Type of process in the execution hierarchy',
max_length=16,
),
),
# Add indexes - must match the SQL index names exactly
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['parent'],
name='machine_process_parent_id_idx',
),
),
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['process_type'],
name='machine_process_process_type_idx',
),
),
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['parent', 'status'],
name='machine_process_parent_status_idx',
),
),
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['machine', 'pid', 'started_at'],
name='machine_process_machine_pid_started_idx',
),
),
],
),
]

View File

@@ -82,13 +82,38 @@ class Machine(ModelWithHealthStats):
)
return _CURRENT_MACHINE
@staticmethod
def from_jsonl(record: dict, overrides: dict = None):
def to_json(self) -> dict:
"""
Update Machine config from JSONL record.
Convert Machine model instance to a JSON-serializable dict.
"""
from archivebox.config import VERSION
return {
'type': 'Machine',
'schema_version': VERSION,
'id': str(self.id),
'guid': self.guid,
'hostname': self.hostname,
'hw_in_docker': self.hw_in_docker,
'hw_in_vm': self.hw_in_vm,
'hw_manufacturer': self.hw_manufacturer,
'hw_product': self.hw_product,
'hw_uuid': self.hw_uuid,
'os_arch': self.os_arch,
'os_family': self.os_family,
'os_platform': self.os_platform,
'os_kernel': self.os_kernel,
'os_release': self.os_release,
'stats': self.stats,
'config': self.config or {},
}
@staticmethod
def from_json(record: dict, overrides: dict = None):
"""
Update Machine config from JSON dict.
Args:
record: JSONL record with '_method': 'update', 'key': '...', 'value': '...'
record: JSON dict with '_method': 'update', 'key': '...', 'value': '...'
overrides: Not used
Returns:
@@ -255,9 +280,9 @@ class Binary(ModelWithHealthStats):
'is_valid': self.is_valid,
}
def to_jsonl(self) -> dict:
def to_json(self) -> dict:
"""
Convert Binary model instance to a JSONL record.
Convert Binary model instance to a JSON-serializable dict.
"""
from archivebox.config import VERSION
return {
@@ -274,17 +299,17 @@ class Binary(ModelWithHealthStats):
}
@staticmethod
def from_jsonl(record: dict, overrides: dict = None):
def from_json(record: dict, overrides: dict = None):
"""
Create/update Binary from JSONL record.
Create/update Binary from JSON dict.
Handles two cases:
1. From binaries.jsonl: creates queued binary with name, binproviders, overrides
1. From binaries.json: creates queued binary with name, binproviders, overrides
2. From hook output: updates binary with abspath, version, sha256, binprovider
Args:
record: JSONL record with 'name' and either:
- 'binproviders', 'overrides' (from binaries.jsonl)
record: JSON dict with 'name' and either:
- 'binproviders', 'overrides' (from binaries.json)
- 'abspath', 'version', 'sha256', 'binprovider' (from hook output)
overrides: Not used
@@ -542,7 +567,7 @@ class ProcessManager(models.Manager):
return process
class Process(ModelWithHealthStats):
class Process(models.Model):
"""
Tracks a single OS process execution.
@@ -563,38 +588,11 @@ class Process(ModelWithHealthStats):
RUNNING = 'running', 'Running'
EXITED = 'exited', 'Exited'
class TypeChoices(models.TextChoices):
CLI = 'cli', 'CLI Command'
SUPERVISORD = 'supervisord', 'Supervisord Daemon'
ORCHESTRATOR = 'orchestrator', 'Orchestrator'
WORKER = 'worker', 'Worker Process'
HOOK = 'hook', 'Hook Script'
BINARY = 'binary', 'Binary Execution'
# Primary fields
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
# Parent process FK for hierarchy tracking
parent = models.ForeignKey(
'self',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='children',
help_text='Parent process that spawned this one'
)
# Process type for distinguishing in hierarchy
process_type = models.CharField(
max_length=16,
choices=TypeChoices.choices,
default=TypeChoices.BINARY,
db_index=True,
help_text='Type of process in the execution hierarchy'
)
# Machine FK - required (every process runs on a machine)
machine = models.ForeignKey(
Machine,
@@ -667,10 +665,6 @@ class Process(ModelWithHealthStats):
help_text='When to retry this process'
)
# Health stats
num_uses_failed = models.PositiveIntegerField(default=0)
num_uses_succeeded = models.PositiveIntegerField(default=0)
state_machine_name: str = 'archivebox.machine.models.ProcessMachine'
objects: ProcessManager = ProcessManager()
@@ -682,8 +676,6 @@ class Process(ModelWithHealthStats):
indexes = [
models.Index(fields=['machine', 'status', 'retry_at']),
models.Index(fields=['binary', 'exit_code']),
models.Index(fields=['parent', 'status']),
models.Index(fields=['machine', 'pid', 'started_at']),
]
def __str__(self) -> str:
@@ -716,9 +708,9 @@ class Process(ModelWithHealthStats):
return self.archiveresult.hook_name
return ''
def to_jsonl(self) -> dict:
def to_json(self) -> dict:
"""
Convert Process model instance to a JSONL record.
Convert Process model instance to a JSON-serializable dict.
"""
from archivebox.config import VERSION
record = {
@@ -742,6 +734,26 @@ class Process(ModelWithHealthStats):
record['timeout'] = self.timeout
return record
@staticmethod
def from_json(record: dict, overrides: dict = None):
"""
Create/update Process from JSON dict.
Args:
record: JSON dict with 'id' or process details
overrides: Optional dict of field overrides
Returns:
Process instance or None
"""
process_id = record.get('id')
if process_id:
try:
return Process.objects.get(id=process_id)
except Process.DoesNotExist:
pass
return None
def update_and_requeue(self, **kwargs):
"""
Update process fields and requeue for worker state machine.
@@ -1751,17 +1763,12 @@ class ProcessMachine(BaseStateMachine, strict_states=True):
@exited.enter
def enter_exited(self):
"""Process has exited."""
success = self.process.exit_code == 0
self.process.update_and_requeue(
retry_at=None,
status=Process.StatusChoices.EXITED,
ended_at=timezone.now(),
)
# Increment health stats based on exit code
self.process.increment_health_stats(success=success)
# =============================================================================
# State Machine Registration

View File

@@ -76,7 +76,7 @@ class TestMachineModel(TestCase):
self.assertEqual(machine1.guid, machine2.guid)
def test_machine_from_jsonl_update(self):
"""Machine.from_jsonl() should update machine config."""
"""Machine.from_json() should update machine config."""
Machine.current() # Ensure machine exists
record = {
'_method': 'update',
@@ -84,14 +84,14 @@ class TestMachineModel(TestCase):
'value': '/usr/bin/wget',
}
result = Machine.from_jsonl(record)
result = Machine.from_json(record)
self.assertIsNotNone(result)
self.assertEqual(result.config.get('WGET_BINARY'), '/usr/bin/wget')
def test_machine_from_jsonl_invalid(self):
"""Machine.from_jsonl() should return None for invalid records."""
result = Machine.from_jsonl({'invalid': 'record'})
"""Machine.from_json() should return None for invalid records."""
result = Machine.from_json({'invalid': 'record'})
self.assertIsNone(result)
def test_machine_manager_current(self):
@@ -254,14 +254,14 @@ class TestProcessModel(TestCase):
self.assertIsNone(process.exit_code)
def test_process_to_jsonl(self):
"""Process.to_jsonl() should serialize correctly."""
"""Process.to_json() should serialize correctly."""
process = Process.objects.create(
machine=self.machine,
cmd=['echo', 'hello'],
pwd='/tmp',
timeout=60,
)
json_data = process.to_jsonl()
json_data = process.to_json()
self.assertEqual(json_data['type'], 'Process')
self.assertEqual(json_data['cmd'], ['echo', 'hello'])