| 131 | | |
| 132 | | |
| 133 | | |
| | 134 | class AsyncWriter(threading.Thread, DeletionMixin): |
| | 135 | """Convenience wrapper for a writer object that might fail due to locking |
| | 136 | (i.e. the ``filedb`` writer). This object will attempt once to obtain the |
| | 137 | underlying writer, and if it's successful, will simply pass method calls |
| | 138 | on to it. |
| | 139 | |
| | 140 | If this object *can't* obtain a writer immediately, it will *buffer* delete, |
| | 141 | add, and update method calls in memory until you call ``commit()``. At that |
| | 142 | point, this object will start running in a separate thread, trying to obtain |
| | 143 | the writer over and over, and once it obtains it, "replay" all the |
| | 144 | buffered method calls on it. |
| | 145 | |
| | 146 | In a typical scenario where you're adding a single or a few documents |
| | 147 | to the index as the result of a Web transaction, this lets you just create |
| | 148 | the writer, add, and commit, without having to worry about index locks, |
| | 149 | retries, etc. |
| | 150 | |
| | 151 | The first argument is a callable which returns the actual writer. |
| | 152 | Usually this will be the ``writer`` method of your Index object. |
| | 153 | Any additional keyword arguments to the initializer are passed into |
| | 154 | the callable. |
| | 155 | |
| | 156 | For example, to get an aynchronous writer, instead of this: |
| | 157 | |
| | 158 | >>> writer = myindex.writer(postlimit=128 * 1024 * 1024) |
| | 159 | |
| | 160 | Do this: |
| | 161 | |
| | 162 | >>> from whoosh.writing import AsyncWriter |
| | 163 | >>> writer = AsyncWriter(myindex.writer, postlimit=128 * 1024 * 1024) |
| | 164 | """ |
| | 165 | |
| | 166 | def __init__(self, writerfn, delay=0.25, **writerargs): |
| | 167 | """ |
| | 168 | :param writerfn: a callable object (function or method) which returns |
| | 169 | the actual writer. |
| | 170 | :param delay: the delay (in seconds) between attempts to instantiate |
| | 171 | the actual writer. |
| | 172 | """ |
| | 173 | |
| | 174 | threading.Thread.__init__(self) |
| | 175 | self.running = False |
| | 176 | self.writerfn = writerfn |
| | 177 | self.writerargs = writerargs |
| | 178 | self.delay = delay |
| | 179 | self.events = [] |
| | 180 | try: |
| | 181 | self.writer = writerfn(**writerargs) |
| | 182 | except LockError: |
| | 183 | self.writer = None |
| | 184 | |
| | 185 | def _record(self, method, *args, **kwargs): |
| | 186 | if self.writer: |
| | 187 | getattr(self.writer, method)(*args, **kwargs) |
| | 188 | else: |
| | 189 | self.events.add(method, args, kwargs) |
| | 190 | |
| | 191 | def run(self): |
| | 192 | self.running = True |
| | 193 | writer = self.writer |
| | 194 | while writer is None: |
| | 195 | try: |
| | 196 | writer = self.writerfn(**self.writerargs) |
| | 197 | except LockError: |
| | 198 | time.sleep(self.delay) |
| | 199 | for method, args, kwargs in self.events: |
| | 200 | getattr(writer, method)(*args, **kwargs) |
| | 201 | writer.commit(*self.commitargs, **self.commitkwargs) |
| | 202 | |
| | 203 | def delete_document(self, docnum): |
| | 204 | self._record("delete_document", docnum) |
| | 205 | |
| | 206 | def add_document(self, *args, **kwargs): |
| | 207 | self._record("add_document", *args, **kwargs) |
| | 208 | |
| | 209 | def update_document(self, *args, **kwargs): |
| | 210 | self._record("update_document", *args, **kwargs) |
| | 211 | |
| | 212 | def commit(self, *args, **kwargs): |
| | 213 | if self.writer: |
| | 214 | self.writer.commit(*args, **kwargs) |
| | 215 | else: |
| | 216 | self.commitargs, self.commitkwargs = args, kwargs |
| | 217 | self.start() |
| | 218 | |
| | 219 | def cancel(self, *args, **kwargs): |
| | 220 | if self.writer: |
| | 221 | self.writer.cancel(*args, **kwargs) |
| | 222 | |
| | 223 | |
| | 224 | |
| | 225 | |
| | 226 | |
| | 227 | |
| | 228 | |
| | 229 | |