-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathtest_server.py
More file actions
4789 lines (3888 loc) · 184 KB
/
Copy pathtest_server.py
File metadata and controls
4789 lines (3888 loc) · 184 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Tests for reMarkable MCP Server
Tests the 4 intent-based tools using FastMCP's testing capabilities.
"""
import json
import os
import sys
import tempfile
import zipfile
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
import pytest
import requests
from remarkable_mcp.api import (
get_item_path,
get_items_by_id,
register_and_get_token,
)
from remarkable_mcp.extract import (
extract_text_from_document_zip,
extract_text_from_rm_file,
find_similar_documents,
)
from remarkable_mcp.responses import (
make_error,
make_response,
)
from remarkable_mcp.server import mcp
# =============================================================================
# Test Fixtures
# =============================================================================
@pytest.fixture
def mock_document():
"""Create a mock Document object."""
doc = Mock()
doc.VissibleName = "Test Document"
doc.ID = "doc-123"
doc.Parent = ""
doc.ModifiedClient = "2024-01-15T10:30:00Z"
return doc
@pytest.fixture
def mock_folder():
"""Create a mock Folder object."""
folder = Mock()
folder.VissibleName = "Test Folder"
folder.ID = "folder-456"
folder.Parent = ""
return folder
@pytest.fixture
def mock_collection(mock_document, mock_folder):
"""Create a mock collection of items."""
return [mock_document, mock_folder]
@pytest.fixture
def sample_zip_file():
"""Create a sample reMarkable document zip for testing."""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as zf:
# Add a sample text file
zf.writestr("sample.txt", "This is sample text content")
# Add a sample content json
zf.writestr("metadata.content", '{"text": "Content metadata text"}')
yield Path(tmp.name)
Path(tmp.name).unlink(missing_ok=True)
# =============================================================================
# Test MCP Server Initialization
# =============================================================================
class TestMCPServerInitialization:
"""Test MCP server initialization and basic functionality."""
def test_server_name(self):
"""Test that server has correct name."""
assert mcp.name == "remarkable"
@pytest.mark.asyncio
async def test_tools_registered(self):
"""Test that all expected tools are registered."""
tools = await mcp.list_tools()
tool_names = [tool.name for tool in tools]
expected_tools = [
"remarkable_read",
"remarkable_browse",
"remarkable_recent",
"remarkable_search",
"remarkable_status",
"remarkable_image",
"remarkable_canvas",
]
for tool_name in expected_tools:
assert tool_name in tool_names, f"Tool {tool_name} not found"
@pytest.mark.asyncio
async def test_tools_count(self):
"""Cloud default: 6 read tools + always-on canvas + 5 write tools.
``remarkable_author`` is SSH-only and therefore hidden in cloud mode, so
the default (cloud) surface is 12 tools, not 13.
"""
tools = await mcp.list_tools()
assert len(tools) == 12, f"Expected 12 tools, got {len(tools)}"
@pytest.mark.asyncio
async def test_tool_schemas(self):
"""Test that tools have proper schemas."""
tools = await mcp.list_tools()
for tool in tools:
assert tool.name, "Tool should have a name"
assert tool.description, "Tool should have a description"
assert hasattr(tool, "inputSchema"), "Tool should have inputSchema"
@pytest.mark.asyncio
async def test_all_tools_have_xml_docstrings(self):
"""Test that all tools have XML-structured documentation."""
tools = await mcp.list_tools()
for tool in tools:
# Check for XML tags in description
desc = tool.description
assert "<usecase>" in desc, f"Tool {tool.name} missing <usecase> tag"
# =============================================================================
# Test Helper Functions
# =============================================================================
class TestHelperFunctions:
"""Test helper functions."""
def test_make_response(self):
"""Test response creation with hint."""
data = {"key": "value"}
result = make_response(data, "This is a hint")
parsed = json.loads(result)
assert parsed["key"] == "value"
assert parsed["_hint"] == "This is a hint"
def test_make_error(self):
"""Test error creation with suggestions."""
result = make_error(
error_type="test_error",
message="Something went wrong",
suggestion="Try this instead",
did_you_mean=["option1", "option2"],
)
parsed = json.loads(result)
assert parsed["_error"]["type"] == "test_error"
assert parsed["_error"]["message"] == "Something went wrong"
assert parsed["_error"]["suggestion"] == "Try this instead"
assert parsed["_error"]["did_you_mean"] == ["option1", "option2"]
def test_make_error_without_did_you_mean(self):
"""Test error creation without did_you_mean."""
result = make_error(
error_type="test_error", message="Error message", suggestion="Suggestion"
)
parsed = json.loads(result)
assert "did_you_mean" not in parsed["_error"]
def test_find_similar_documents(self):
"""Test fuzzy document matching."""
docs = [
Mock(VissibleName="Meeting Notes"),
Mock(VissibleName="Project Plan"),
Mock(VissibleName="Notes Daily"),
]
# Exact partial match
results = find_similar_documents("Notes", docs)
assert "Meeting Notes" in results or "Notes Daily" in results
# Fuzzy match
results = find_similar_documents("Meating", docs, limit=3)
assert len(results) <= 3
def test_get_items_by_id(self, mock_collection):
"""Test building ID lookup dict."""
items_by_id = get_items_by_id(mock_collection)
assert "doc-123" in items_by_id
assert "folder-456" in items_by_id
def test_get_item_path(self, mock_document, mock_collection):
"""Test getting full item path."""
items_by_id = get_items_by_id(mock_collection)
path = get_item_path(mock_document, items_by_id)
assert path == "/Test Document"
def test_get_item_path_nested(self, mock_folder):
"""Test getting path for nested item."""
# Create nested structure
child_doc = Mock()
child_doc.VissibleName = "Child Doc"
child_doc.ID = "child-789"
child_doc.Parent = mock_folder.ID
items_by_id = {mock_folder.ID: mock_folder, child_doc.ID: child_doc}
path = get_item_path(child_doc, items_by_id)
assert path == "/Test Folder/Child Doc"
# =============================================================================
# Test Text Extraction
# =============================================================================
class TestTextExtraction:
"""Test text extraction functions."""
def test_extract_text_from_document_zip(self, sample_zip_file):
"""Test extracting text from a zip file."""
result = extract_text_from_document_zip(sample_zip_file)
assert "typed_text" in result
assert "highlights" in result
assert "handwritten_text" in result
assert "pages" in result
# Should have extracted text from txt file
assert any("sample text" in text.lower() for text in result["typed_text"])
def test_extract_text_from_rm_file_no_rmscene(self):
"""Test graceful fallback when rmscene not available."""
# Create a dummy file
with tempfile.NamedTemporaryFile(suffix=".rm", delete=False) as tmp:
tmp.write(b"dummy data")
tmp_path = Path(tmp.name)
try:
# This should return empty list if rmscene fails
result = extract_text_from_rm_file(tmp_path)
assert isinstance(result, list)
finally:
tmp_path.unlink(missing_ok=True)
# =============================================================================
# Test remarkable_status Tool
# =============================================================================
class TestRemarkableStatus:
"""Test remarkable_status tool."""
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_status_authenticated(self, mock_get_rmapi):
"""Test status when authenticated."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
result = await mcp.call_tool("remarkable_status", {})
data = json.loads(result[0][0].text)
assert data["authenticated"] is True
assert "transport" in data
assert "connection" in data
assert data["status"] == "connected"
assert "_hint" in data
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_status_capability_matrix(self, mock_get_rmapi):
"""Status exposes per-transport and effective capability matrices."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
result = await mcp.call_tool("remarkable_status", {})
data = json.loads(result[0][0].text)
assert "write_enabled" in data
assert "capabilities" in data
assert "capabilities_by_transport" in data
matrix = data["capabilities_by_transport"]
# All three transports are described.
assert set(matrix) == {"cloud", "ssh", "usb-web"}
# Read/render are universal.
for caps in matrix.values():
assert caps["read"] is True
assert caps["render"] is True
# Cloud and SSH have full write surface; USB web is upload-only.
for mode in ("cloud", "ssh"):
assert all(matrix[mode][op] for op in ("upload", "mkdir", "move", "rename", "delete"))
assert matrix["usb-web"]["upload"] is True
assert not any(matrix["usb-web"][op] for op in ("mkdir", "move", "rename", "delete"))
# Effective capabilities for the active transport always cover read/render.
assert data["capabilities"]["read"] is True
assert data["capabilities"]["render"] is True
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_status_not_authenticated(self, mock_get_rmapi):
"""Test status when not authenticated."""
mock_get_rmapi.side_effect = RuntimeError("Failed to authenticate")
result = await mcp.call_tool("remarkable_status", {})
data = json.loads(result[0][0].text)
assert data["authenticated"] is False
assert "error" in data
assert "_hint" in data
# Hint should include registration instructions or SSH mode
assert "register" in data["_hint"].lower() or "ssh" in data["_hint"].lower()
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_status_reports_cloud_after_fallback(self, mock_get_rmapi, monkeypatch):
"""When USB/SSH falls back to cloud, status reports the effective transport."""
import remarkable_mcp.api as api
monkeypatch.setattr(api, "REMARKABLE_USE_USB_WEB", True)
monkeypatch.setattr(api, "REMARKABLE_USE_SSH", False)
# Simulate that get_rmapi() already resolved to a cloud fallback.
monkeypatch.setattr(api, "get_active_transport", lambda: "cloud")
mock_client = Mock()
mock_client.get_meta_items.return_value = []
mock_get_rmapi.return_value = mock_client
result = await mcp.call_tool("remarkable_status", {})
data = json.loads(result[0][0].text)
assert data["authenticated"] is True
assert data["transport"] == "cloud"
assert data["fell_back_to_cloud"] is True
# Effective capabilities reflect cloud (full write surface when enabled).
assert data["capabilities"]["read"] is True
assert "usb-web" in data["_hint"] # mentions what it fell back from
assert "fell back" in data["_hint"].lower()
# =============================================================================
# Test remarkable_browse Tool
# =============================================================================
class TestRemarkableBrowse:
"""Test remarkable_browse tool."""
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_browse_root(self, mock_get_rmapi):
"""Test browsing root folder."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
result = await mcp.call_tool("remarkable_browse", {"path": "/"})
data = json.loads(result[0][0].text)
assert data["mode"] == "browse"
assert data["path"] == "/"
assert "_hint" in data
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_browse_search_mode(self, mock_get_rmapi):
"""Test search mode."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
# Create mock items that have VissibleName
mock_doc = Mock()
mock_doc.VissibleName = "Test Document"
mock_doc.ID = "doc-123"
mock_doc.Parent = ""
mock_doc.ModifiedClient = "2024-01-15"
mock_client.get_meta_items.return_value = [mock_doc]
result = await mcp.call_tool("remarkable_browse", {"query": "Test"})
data = json.loads(result[0][0].text)
assert data["mode"] == "search"
assert data["query"] == "Test"
assert "results" in data
assert "_hint" in data
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_browse_error_handling(self, mock_get_rmapi):
"""Test error handling in browse."""
mock_get_rmapi.side_effect = RuntimeError("Connection failed")
result = await mcp.call_tool("remarkable_browse", {"path": "/"})
data = json.loads(result[0][0].text)
assert "_error" in data
assert data["_error"]["type"] == "browse_failed"
# =============================================================================
# Test remarkable_recent Tool
# =============================================================================
class TestRemarkableRecent:
"""Test remarkable_recent tool."""
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_recent_default_limit(self, mock_get_rmapi):
"""Test getting recent documents with default limit."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
result = await mcp.call_tool("remarkable_recent", {})
data = json.loads(result[0][0].text)
assert "count" in data
assert "documents" in data
assert "_hint" in data
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_recent_custom_limit(self, mock_get_rmapi):
"""Test getting recent documents with custom limit."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
result = await mcp.call_tool("remarkable_recent", {"limit": 5})
data = json.loads(result[0][0].text)
assert "count" in data
assert "documents" in data
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_recent_limit_clamped(self, mock_get_rmapi):
"""Test that limit is clamped to valid range."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
# Test with limit > 50
result = await mcp.call_tool("remarkable_recent", {"limit": 100})
# Should not raise an error
data = json.loads(result[0][0].text)
assert "count" in data
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_recent_error_handling(self, mock_get_rmapi):
"""Test error handling in recent."""
mock_get_rmapi.side_effect = RuntimeError("Connection failed")
result = await mcp.call_tool("remarkable_recent", {})
data = json.loads(result[0][0].text)
assert "_error" in data
assert data["_error"]["type"] == "recent_failed"
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_recent_include_preview_does_not_crash(self, mock_get_rmapi):
"""Test that include_preview=True works without AttributeError on download result.
This is a regression test for the bug where client.download() returns bytes
but the code called raw_doc.content (treating it like a requests.Response).
"""
import io
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
# Create a PDF document mock
doc = Mock()
doc.VissibleName = "My PDF"
doc.ID = "pdf-123"
doc.Parent = ""
doc.ModifiedClient = "2024-01-15T10:30:00Z"
doc.is_folder = False
doc.tags = []
mock_client.get_meta_items.return_value = [doc]
# download() returns bytes (not a requests.Response)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zf:
zf.writestr("pdf-123.content", '{"fileType": "pdf"}')
mock_client.download.return_value = zip_buffer.getvalue()
# Simulate get_file_type returning "pdf"
with patch("remarkable_mcp.tools.get_file_type", return_value="pdf"):
result = await mcp.call_tool("remarkable_recent", {"include_preview": True})
data = json.loads(result[0][0].text)
# Should not crash with AttributeError; may return empty preview but no error
assert "_error" not in data
assert "documents" in data
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_recent_handles_null_and_mixed_modified_dates(self, mock_get_rmapi):
"""Regression test for #96: remarkable_recent must not crash when documents
have a null modified date, nor when modified dates mix tz-aware (USB) and
tz-naive (cloud/SSH) datetimes.
Previously the sort key returned "" (str) for missing dates and a datetime
otherwise, raising TypeError ('<' not supported between str and datetime).
A tz-aware sentinel would still crash cloud/SSH (naive vs aware compare).
"""
from datetime import datetime, timezone
class FakeDoc:
def __init__(self, doc_id, name, modified):
self.ID = doc_id
self.VissibleName = name
self.Parent = ""
self.is_folder = False
self.tags = []
self.ModifiedClient = modified
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
# newest: tz-aware (USB style); middle: tz-naive (cloud style); oldest: None
mock_client.get_meta_items.return_value = [
FakeDoc("a", "Naive Middle", datetime(2024, 6, 1, 12, 0, 0)),
FakeDoc("b", "Fresh Notebook", None),
FakeDoc("c", "Aware Newest", datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc)),
]
result = await mcp.call_tool("remarkable_recent", {})
data = json.loads(result[0][0].text)
assert "_error" not in data
names = [d["name"] for d in data["documents"]]
assert names == ["Aware Newest", "Naive Middle", "Fresh Notebook"]
# =============================================================================
# Test remarkable_search Tool
# =============================================================================
class TestRemarkableSearch:
"""Test remarkable_search tool."""
@pytest.fixture(autouse=True)
def _clear_root_path(self, monkeypatch):
# Ensure tests are independent of any ambient REMARKABLE_ROOT_PATH
monkeypatch.delenv("REMARKABLE_ROOT_PATH", raising=False)
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_search_awaits_remarkable_read(self, mock_get_rmapi):
"""Regression test: remarkable_search must await the async remarkable_read.
Previously remarkable_search was a sync function calling the async
remarkable_read without awaiting it, resulting in per-document errors:
'the JSON object must be str, bytes or bytearray, not coroutine'
because json.loads() was called on a coroutine object.
"""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
doc = Mock()
doc.VissibleName = "mcp notes"
doc.ID = "doc-mcp-1"
doc.Parent = ""
doc.ModifiedClient = "2024-01-15T10:30:00Z"
doc.is_folder = False
doc.is_cloud_archived = False
doc.tags = []
mock_client.get_meta_items.return_value = [doc]
# Minimal zip so remarkable_read can succeed end-to-end
import io
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zf:
zf.writestr("doc-mcp-1.content", '{"fileType": "notebook"}')
mock_client.download.return_value = zip_buffer.getvalue()
result = await mcp.call_tool("remarkable_search", {"query": "mcp", "limit": 2})
data = json.loads(result[0][0].text)
# Top-level call must succeed (not surface a coroutine TypeError)
assert "_error" not in data, f"Unexpected top-level error: {data.get('_error')}"
assert "documents" in data
assert data["count"] >= 1
# No per-document result should contain the coroutine error
for doc_result in data["documents"]:
err = doc_result.get("error", "")
assert "coroutine" not in err, f"Per-document coroutine error leaked through: {err}"
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_search_no_documents_found(self, mock_get_rmapi):
"""Test search returns clean error when no documents match."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
result = await mcp.call_tool("remarkable_search", {"query": "nonexistent-xyz"})
data = json.loads(result[0][0].text)
assert "_error" in data
assert data["_error"]["type"] == "no_documents_found"
# =============================================================================
# Test remarkable_read Tool
# =============================================================================
class TestRemarkableRead:
"""Test remarkable_read tool."""
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_read_document_not_found(self, mock_get_rmapi):
"""Test reading a non-existent document."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
result = await mcp.call_tool("remarkable_read", {"document": "NonExistent"})
data = json.loads(result[0][0].text)
assert "_error" in data
assert data["_error"]["type"] == "document_not_found"
assert "suggestion" in data["_error"]
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_read_error_handling(self, mock_get_rmapi):
"""Test error handling in read."""
mock_get_rmapi.side_effect = RuntimeError("Connection failed")
result = await mcp.call_tool("remarkable_read", {"document": "Test"})
data = json.loads(result[0][0].text)
assert "_error" in data
assert data["_error"]["type"] == "read_failed"
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_read_provides_suggestions(self, mock_get_rmapi, mock_document):
"""Test that read provides 'did you mean' suggestions."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = [mock_document]
# Search for something similar but not exact
result = await mcp.call_tool("remarkable_read", {"document": "Test Doc"})
data = json.loads(result[0][0].text)
# Should get a not found error with suggestions
assert "_error" in data
assert data["_error"]["type"] == "document_not_found"
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_read_notebook_empty_content_ocr_retry(self, mock_get_rmapi):
"""Test that remarkable_read correctly awaits the OCR auto-retry for empty notebooks.
This is a regression test for the bug where the recursive call to
remarkable_read() was missing 'await', causing a coroutine object to be
passed to json.loads() with the error:
'the JSON object must be str, bytes or bytearray, not coroutine'
"""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
# Create a notebook document mock
doc = Mock()
doc.VissibleName = "Quick sheets"
doc.ID = "notebook-123"
doc.Parent = ""
doc.ModifiedClient = "2024-01-15T10:30:00Z"
doc.is_folder = False
doc.tags = []
mock_client.get_meta_items.return_value = [doc]
# Create a minimal zip with no typed text (simulates a handwritten notebook)
import io
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zf:
# Add empty content file (no text field) to simulate notebook
zf.writestr("notebook-123.content", '{"fileType": "notebook"}')
zip_bytes = zip_buffer.getvalue()
mock_client.download.return_value = zip_bytes
# This should NOT raise "the JSON object must be str, bytes or bytearray, not coroutine"
# Previously failed because remarkable_read() was called without 'await'
result = await mcp.call_tool("remarkable_read", {"document": "Quick sheets"})
data = json.loads(result[0][0].text)
# Should return a valid response (not a coroutine error)
assert (
"_error" not in data
or data["_error"]["type"] != "read_failed"
or ("coroutine" not in data["_error"].get("message", ""))
), f"Got coroutine error: {data}"
# =============================================================================
# Test remarkable_image Tool
# =============================================================================
class TestRemarkableImage:
"""Test remarkable_image tool."""
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_image_document_not_found(self, mock_get_rmapi):
"""Test getting image from non-existent document."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = []
result = await mcp.call_tool("remarkable_image", {"document": "NonExistent"})
data = json.loads(result[0].text)
assert "_error" in data
assert data["_error"]["type"] == "document_not_found"
assert "suggestion" in data["_error"]
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_image_error_handling(self, mock_get_rmapi):
"""Test error handling in image tool."""
mock_get_rmapi.side_effect = RuntimeError("Connection failed")
result = await mcp.call_tool("remarkable_image", {"document": "Test"})
data = json.loads(result[0].text)
assert "_error" in data
assert data["_error"]["type"] == "image_failed"
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_image_provides_suggestions(self, mock_get_rmapi, mock_document):
"""Test that image tool provides 'did you mean' suggestions."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_client.get_meta_items.return_value = [mock_document]
# Search for something similar but not exact
result = await mcp.call_tool("remarkable_image", {"document": "Test Doc"})
data = json.loads(result[0].text)
# Should get a not found error with suggestions
assert "_error" in data
assert data["_error"]["type"] == "document_not_found"
@pytest.mark.asyncio
async def test_image_compatibility_parameter_in_schema(self):
"""Test that remarkable_image tool has the compatibility parameter in its schema."""
tools = await mcp.list_tools()
image_tool = next(t for t in tools if t.name == "remarkable_image")
# Check that compatibility parameter exists in the input schema
assert "compatibility" in image_tool.inputSchema.get("properties", {})
compat_schema = image_tool.inputSchema["properties"]["compatibility"]
assert compat_schema.get("type") == "boolean"
assert compat_schema.get("default") is False
# =============================================================================
# Test Merged Rendering
# =============================================================================
class TestMergedRendering:
"""Test render_merged parameter for remarkable_image."""
@pytest.mark.asyncio
async def test_render_merged_parameter_in_schema(self):
"""Test that remarkable_image tool has the render_merged parameter in its schema."""
tools = await mcp.list_tools()
image_tool = next(t for t in tools if t.name == "remarkable_image")
# Check that render_merged parameter exists in the input schema
assert "render_merged" in image_tool.inputSchema.get("properties", {})
merged_schema = image_tool.inputSchema["properties"]["render_merged"]
assert merged_schema.get("type") == "boolean"
assert merged_schema.get("default") is False
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
@patch("remarkable_mcp.tools.render_merged_page_from_document_zip")
@patch("remarkable_mcp.tools.get_document_page_count")
async def test_render_merged_fallback_no_pdf(
self,
mock_page_count,
mock_render_merged,
mock_get_rmapi,
mock_document,
):
"""Test render_merged falls back when document has no PDF underlay."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_document.is_folder = False
mock_client.get_meta_items.return_value = [mock_document]
mock_client.download.return_value = b"fake zip"
mock_page_count.return_value = 3
# Simulate merged function returning annotation-only with a note (no PDF)
fake_png = b"\x89PNG\r\n\x1a\n" + b"\x00" * 50
mock_render_merged.return_value = (
fake_png,
"No PDF underlay found; returned annotation-only render.",
)
with patch("tempfile.NamedTemporaryFile") as mock_tmpfile:
mock_tmp = Mock()
mock_tmp.__enter__ = Mock(return_value=mock_tmp)
mock_tmp.__exit__ = Mock(return_value=False)
mock_tmp.name = "/tmp/test.zip"
mock_tmpfile.return_value = mock_tmp
with patch("pathlib.Path.unlink"):
result = await mcp.call_tool(
"remarkable_image",
{
"document": "Test Document",
"render_merged": True,
"compatibility": True,
},
)
data = json.loads(result[0].text)
# Should fall back to annotation-only since no PDF underlay
assert data.get("merged") is False
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
async def test_render_merged_svg_ignored(self, mock_get_rmapi, mock_document):
"""Test that SVG format ignores render_merged gracefully."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_document.is_folder = False
mock_client.get_meta_items.return_value = [mock_document]
mock_client.download.return_value = b"fake zip"
with (
patch("tempfile.NamedTemporaryFile") as mock_tmpfile,
patch("remarkable_mcp.tools.get_document_page_count", return_value=2),
patch(
"remarkable_mcp.tools.render_page_from_document_zip_svg",
return_value='<svg xmlns="http://www.w3.org/2000/svg"></svg>',
),
):
mock_tmp = Mock()
mock_tmp.__enter__ = Mock(return_value=mock_tmp)
mock_tmp.__exit__ = Mock(return_value=False)
mock_tmp.name = "/tmp/test.zip"
mock_tmpfile.return_value = mock_tmp
with patch("pathlib.Path.unlink"):
result = await mcp.call_tool(
"remarkable_image",
{
"document": "Test Document",
"output_format": "svg",
"render_merged": True,
"compatibility": True,
},
)
data = json.loads(result[0].text)
# SVG should return successfully but merged=False
assert data.get("merged") is False
assert "render_merged is only supported with PNG" in data.get("_hint", "")
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.get_rmapi")
@patch("remarkable_mcp.tools.render_merged_page_from_document_zip")
@patch("remarkable_mcp.tools.get_document_page_count")
async def test_render_merged_success_path(
self,
mock_page_count,
mock_render_merged,
mock_get_rmapi,
mock_document,
):
"""Test render_merged success path: returns merged PNG with .merged.png URI."""
mock_client = Mock()
mock_get_rmapi.return_value = mock_client
mock_document.is_folder = False
mock_client.get_meta_items.return_value = [mock_document]
mock_client.download.return_value = b"fake zip"
mock_page_count.return_value = 5
# Simulate successful merged render (no fallback note)
composited_png = b"\x89PNG\r\n\x1a\n" + b"\x00" * 200
mock_render_merged.return_value = (composited_png, None)
with patch("tempfile.NamedTemporaryFile") as mock_tmpfile:
mock_tmp = Mock()
mock_tmp.__enter__ = Mock(return_value=mock_tmp)
mock_tmp.__exit__ = Mock(return_value=False)
mock_tmp.name = "/tmp/test.zip"
mock_tmpfile.return_value = mock_tmp
with patch("pathlib.Path.unlink"):
result = await mcp.call_tool(
"remarkable_image",
{
"document": "Test Document",
"render_merged": True,
"compatibility": True,
},
)
data = json.loads(result[0].text)
assert data.get("merged") is True
assert data.get("resource_uri", "").endswith(".merged.png")
hint = data.get("_hint", "")
assert "PDF" in hint and "annotation" in hint.lower()
# The merged renderer should have been called once for this page
assert mock_render_merged.called
def _make_synthetic_pdf(pages: int = 2) -> bytes:
"""Build a small multi-page PDF in memory for fallback tests."""
import fitz
doc = fitz.open()
for _ in range(pages):
# reMarkable-ish portrait page dimensions in points
doc.new_page(width=445, height=594)
data = doc.tobytes()
doc.close()
return data
class TestRenderTabletPdfFallback:
"""Tests for the portable tablet-PDF render fallback (issue #95/#102/#94)."""
def test_render_tablet_pdf_page_to_png_returns_png(self):
"""A valid PDF page rasterizes to PNG bytes."""
from remarkable_mcp.extract import render_tablet_pdf_page_to_png
pdf = _make_synthetic_pdf(2)
png = render_tablet_pdf_page_to_png(pdf, page=1)
assert png is not None
assert png.startswith(b"\x89PNG\r\n\x1a\n")
def test_render_tablet_pdf_page_out_of_range(self):
"""Out-of-range page returns None rather than raising."""
from remarkable_mcp.extract import render_tablet_pdf_page_to_png
pdf = _make_synthetic_pdf(1)
assert render_tablet_pdf_page_to_png(pdf, page=5) is None
assert render_tablet_pdf_page_to_png(pdf, page=0) is None
def test_render_tablet_pdf_invalid_bytes(self):
"""Invalid PDF bytes return None rather than raising."""
from remarkable_mcp.extract import render_tablet_pdf_page_to_png
assert render_tablet_pdf_page_to_png(b"not a pdf", page=1) is None
@pytest.mark.asyncio
@patch("remarkable_mcp.tools.download_raw_file")
@patch("remarkable_mcp.tools.render_page_from_document_zip")
@patch("remarkable_mcp.tools.get_document_page_count")
@patch("remarkable_mcp.tools.get_rmapi")
async def test_image_falls_back_to_tablet_pdf(
self,
mock_get_rmapi,
mock_page_count,
mock_render,
mock_download_raw,
mock_document,
):